#include "freertos/idf_additions.h" #include "freertos/projdefs.h" #include "meshtalos.h" #include "mongoose.h" #include "portmacro.h" #include #include #include struct sub *sub_list = NULL; SemaphoreHandle_t sub_mutex = NULL; QueueHandle_t sub_jobs = NULL; static void send_image(struct mg_connection *c) { char mp_buffer[IMAGE_BLK_SIZE + 50]; for (int i = 0; i < IMAGE_SIZE; i += IMAGE_BLK_SIZE) { mpack_writer_t wr; mpack_writer_init(&wr, mp_buffer, sizeof(mp_buffer)); mpack_build_map(&wr); mpack_write_cstr(&wr, "command"); mpack_write_cstr(&wr, "update_image"); mpack_write_cstr(&wr, "index"); mpack_write_u64(&wr, i); mpack_write_cstr(&wr, "data"); mpack_write_bytes(&wr, (const char *)&image_buf[i], MIN(IMAGE_SIZE, IMAGE_SIZE - i)); mpack_complete_map(&wr); size_t used = mpack_writer_buffer_used(&wr); if (mpack_writer_destroy(&wr) != mpack_ok) { MG_INFO(("faild build update image mpack")); return; } mg_send(c, mp_buffer, used); } mpack_writer_t wr; mpack_writer_init(&wr, mp_buffer, sizeof(mp_buffer)); mpack_build_map(&wr); mpack_write_cstr(&wr, "command"); mpack_write_cstr(&wr, "push_image"); size_t used = mpack_writer_buffer_used(&wr); if (mpack_writer_destroy(&wr) != mpack_ok) { MG_INFO(("faild build update image mpack")); return; } mg_send(c, mp_buffer, used); } static void sfn(struct mg_connection *c, int ev, void *ev_data) { if (ev == MG_EV_OPEN && c->is_listening == 1) { MG_INFO(("SERVER is listening")); } else if (ev == MG_EV_ACCEPT) { MG_INFO(("SERVER accepted a connection")); struct sub *newsub = calloc(1, sizeof(struct sub)); for (struct sub **ptr = &sub_list; *ptr != NULL; ptr = &(*ptr)->next) { if ((*ptr)->next == NULL) { (*ptr)->next = newsub; } } } else if (ev == MG_EV_READ) { } else if (ev == MG_EV_CLOSE) { MG_INFO(("SERVER disconnected")); if (sub_list->c == c) { sub_list = sub_list->next; } for (struct sub *ptr = sub_list; ptr != NULL; ptr = ptr->next) { if (ptr->next && ptr->next->c == c) { ptr->next = ptr->next->next; } } } else if (ev == MG_EV_ERROR) { MG_INFO(("SERVER error: %s", (char *)ev_data)); } else if (ev == MG_EV_POLL) { struct sync_jobs job; if (xQueueReceive(sub_jobs, &job, (TickType_t)20) == pdPASS) { MG_INFO(("SERVER CLEAN QUEUE ")); for (struct sub *child = sub_list; child != NULL; child = child->next) { // if (strncmp(child->topic, job.topic, TOPIC_LEN) == 0) { send_image(child->c); // } } } } } void mtsh_tcp_sender(struct mg_mgr *mgr) { sub_mutex = xSemaphoreCreateMutex(); sub_jobs = xQueueCreate(10, sizeof(struct sync_jobs)); if (sub_mutex == NULL || sub_jobs == NULL) { // errhandle MG_ERROR(("failed to init tcp jobs")); return; } mg_listen(mgr, "tcp://localhost:3030", &sfn, NULL); }