#include "freertos/idf_additions.h" #include "freertos/projdefs.h" #include "meshtalos.h" #include "mongoose.h" #include "portmacro.h" #include #include #include struct sub *sub_list = NULL; SemaphoreHandle_t sub_mutex = NULL; QueueHandle_t sub_jobs = NULL; static void response(struct mg_connection *c, int status) { mpack_writer_t writer; char *data; size_t size; mpack_writer_init_growable(&writer, &data, &size); mpack_start_array(&writer, 1); mpack_write_i16(&writer, status); if (mpack_writer_destroy(&writer) == mpack_ok) { mg_send(c, data, size); } mg_free(data); } static int new_sub(mpack_reader_t *rdr, struct mg_connection *c) { static char topic_buf[TOPIC_LEN]; mpack_expect_cstr(rdr, topic_buf, sizeof(topic_buf)); if (mpack_reader_destroy(rdr) != mpack_ok) { return -2; } struct sub **ptr = &sub_list; bool dup = 0; for (; *ptr; ptr = &(*ptr)->next) { if (c == (*ptr)->c) { dup = 1; break; } } if (dup) { return -3; } if (xSemaphoreTake(sub_mutex, 10) == pdTRUE) { *ptr = calloc(1, sizeof(struct sub)); int stat = 0; if (*ptr == NULL) { stat = -4; goto end_task; } strncpy((*ptr)->topic, topic_buf, TOPIC_LEN); (*ptr)->c = c; end_task: xSemaphoreGive(sub_mutex); return stat; } else { return -1; } } static void send_image(struct mg_connection *c) { mg_send(c, image_buf, sizeof(image_buf)); } static void sfn(struct mg_connection *c, int ev, void *ev_data) { if (ev == MG_EV_OPEN && c->is_listening == 1) { MG_INFO(("SERVER is listening")); } else if (ev == MG_EV_ACCEPT) { MG_INFO(("SERVER accepted a connection")); } else if (ev == MG_EV_READ) { struct mg_iobuf *r = &c->recv; MG_INFO(("SERVER got data: %.*s", r->len, r->buf)); // mg_send(c, r->buf, r->len); // echo it back // r->len = 0; // Tell Mongoose we've consumed data mpack_reader_t rx; mpack_reader_init_data(&rx, (const char *)r->buf, r->len); mpack_expect_array(&rx); uint8_t id = mpack_expect_u8(&rx); switch (id) { case 1: // sub response(c, new_sub(&rx, c)); break; } r->len = 0; } else if (ev == MG_EV_CLOSE) { MG_INFO(("SERVER disconnected")); } else if (ev == MG_EV_ERROR) { MG_INFO(("SERVER error: %s", (char *)ev_data)); } else if (ev == MG_EV_POLL) { struct sync_jobs job; if (xQueueReceive(sub_jobs, &job, (TickType_t)20) == pdPASS) { MG_INFO(("SERVER CLEAN QUEUE ")); for (struct sub *child = sub_list; child != NULL; child = child->next) { if (strncmp(child->topic, job.topic, TOPIC_LEN) == 0) { send_image(child->c); } } } } } void mtsh_tcp_sender(struct mg_mgr *mgr) { sub_mutex = xSemaphoreCreateMutex(); sub_jobs = xQueueCreate(10, sizeof(struct sync_jobs)); if (sub_mutex == NULL || sub_jobs == NULL) { // errhandle MG_ERROR(("failed to init tcp jobs")); return; } mg_listen(mgr, "tcp://localhost:3030", &sfn, NULL); }