From ea95b056cc2f49274648e70b39bb2838ba729315 Mon Sep 17 00:00:00 2001 From: pictures2333 Date: Wed, 31 Dec 2025 15:55:12 +0800 Subject: [PATCH] feat: mpack + continus stream --- main/main.cpp | 15 +--- main/main.h | 14 ++-- main/tcphelper.c | 190 ++++++++++++++++++++++++++---------------- tcp_client2.py | 75 +++++++++++++++++ tcp_client_asyncio.py | 28 ------- 5 files changed, 202 insertions(+), 120 deletions(-) create mode 100644 tcp_client2.py delete mode 100644 tcp_client_asyncio.py diff --git a/main/main.cpp b/main/main.cpp index c340796..8dea03c 100644 --- a/main/main.cpp +++ b/main/main.cpp @@ -55,17 +55,10 @@ extern "C" int command_push_image() { } } -extern "C" int tcp_client_callback(char *buf, size_t len) { - for (size_t i = 0; i < len; i++) { - printf("%02x ", buf[i]); - } - printf("\n"); - - request_t *req = client_parse_data(buf, len); - if (!req) { // error -> invalid data - return -2; - } - +extern "C" int tcp_client_callback(request_t *req) { + printf("cmd -> %s\n", req->command); + printf("index -> %llu\n", req->index); + printf("data_len -> %u\n", req->data_len); // execute command if (strcmp(req->command, "ping") == 0) { diff --git a/main/main.h b/main/main.h index 28f321a..b57daa1 100644 --- a/main/main.h +++ b/main/main.h @@ -13,18 +13,18 @@ // change these static const uint8_t MESH_ID[6] = { 0x77, 0x77, 0x77, 0x77, 0x77, 0x76 }; -#define CONFIG_MESH_ROUTER_SSID "FBK_the_cutest_fox" -#define CONFIG_MESH_ROUTER_PASSWD "zsfv3210" -//#define CONFIG_MESH_ROUTER_SSID "testesp" -//#define CONFIG_MESH_ROUTER_PASSWD "RABCRABC" +//#define CONFIG_MESH_ROUTER_SSID "FBK_the_cutest_fox" +//#define CONFIG_MESH_ROUTER_PASSWD "zsfv3210" +#define CONFIG_MESH_ROUTER_SSID "testesp" +#define CONFIG_MESH_ROUTER_PASSWD "RABCRABC" #define CONFIG_MESH_AP_AUTHMODE WIFI_AUTH_WPA2_PSK #define CONFIG_MESH_AP_PASSWD "RASRASRAS" #define CONFIG_MESH_AP_CONNECTIONS 6 // number of nodes #define CONFIG_MESH_NON_MESH_AP_CONNECTIONS 0 // number of non-node devices #define CONFIG_TCP_ROOT 0 -//#define CONFIG_TCP_SERVER_IP "172.16.0.1" -#define CONFIG_TCP_SERVER_IP "10.189.34.172" +#define CONFIG_TCP_SERVER_IP "172.16.0.1" +//#define CONFIG_TCP_SERVER_IP "10.189.34.172" #define CONFIG_TCP_SERVER_PORT "3030" #define CONFIG_TCP_RXBUFFER_SIZE 4096 // tcp raw data max size #define CONFIG_IMAGE_BUF_SLICE_SIZE 2048 // map["data"] max size @@ -51,7 +51,7 @@ typedef struct request_t { #if CONFIG_TCP_ROOT == 1 int tcp_server_callback(const char *TAG, int sock); #else -int tcp_client_callback(char *buf, size_t len); +int tcp_client_callback(request_t *req); #endif request_t *client_parse_data(char *buf, size_t len); diff --git a/main/tcphelper.c b/main/tcphelper.c index 4cbe277..cf2ed07 100644 --- a/main/tcphelper.c +++ b/main/tcphelper.c @@ -18,6 +18,7 @@ #include "nvs_flash.h" #include "esp_wifi.h" +#include "mpack.h" #include "main.h" #include "mesh_netif.h" @@ -298,6 +299,20 @@ void start_tcp_server(void) #else static const char *TAG = "nonblocking-socket-client"; +size_t mp_fill(mpack_tree_t *reader, char *buffer, size_t count) { + int sock = (int)reader->context; + ESP_LOGI(TAG, "expect %lu bytes", count); + ssize_t r = recv(sock, buffer, count, 0); + ESP_LOGI(TAG, "received %ld bytes", r); + + if (r <= 0) { + mpack_tree_flag_error(reader, mpack_error_io); + return 0; + } + + return r; +} + // tcp client static void tcp_client_task(void *pvParameters) { @@ -323,87 +338,114 @@ static void tcp_client_task(void *pvParameters) ESP_LOGI(TAG, "Socket created, connecting to %s:%s", CONFIG_TCP_SERVER_IP, CONFIG_TCP_SERVER_PORT); // Marking the socket as non-blocking - int flags = fcntl(sock, F_GETFL); - if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) { - log_socket_error(TAG, sock, errno, "Unable to set socket non blocking"); - } + //int flags = fcntl(sock, F_GETFL); + //if (fcntl(sock, F_SETFL, flags) == -1) { + // log_socket_error(TAG, sock, errno, "Unable to set socket non blocking"); + //} if (connect(sock, address_info->ai_addr, address_info->ai_addrlen) != 0) { - if (errno == EINPROGRESS) { - ESP_LOGD(TAG, "connection in progress"); - fd_set fdset; - FD_ZERO(&fdset); - FD_SET(sock, &fdset); - - // Connection in progress -> have to wait until the connecting socket is marked as writable, i.e. connection completes - res = select(sock+1, NULL, &fdset, NULL, NULL); - if (res < 0) { - log_socket_error(TAG, sock, errno, "Error during connection: select for socket to be writable"); - goto error; - } else if (res == 0) { - log_socket_error(TAG, sock, errno, "Connection timeout: select for socket to be writable"); - goto error; - } else { - int sockerr; - socklen_t len = (socklen_t)sizeof(int); - - if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (void*)(&sockerr), &len) < 0) { - log_socket_error(TAG, sock, errno, "Error when getting socket error using getsockopt()"); - goto error; - } - if (sockerr) { - log_socket_error(TAG, sock, sockerr, "Connection error"); - goto error; - } - } - } else { - log_socket_error(TAG, sock, errno, "Socket is unable to connect"); - goto error; - } + ESP_LOGE(TAG, "Failed to connect server"); } + //if (connect(sock, address_info->ai_addr, address_info->ai_addrlen) != 0) { + // if (errno == EINPROGRESS) { + // ESP_LOGD(TAG, "connection in progress"); + // fd_set fdset; + // FD_ZERO(&fdset); + // FD_SET(sock, &fdset); +// + // // Connection in progress -> have to wait until the connecting socket is marked as writable, i.e. connection completes + // res = select(sock+1, NULL, &fdset, NULL, NULL); + // if (res < 0) { + // log_socket_error(TAG, sock, errno, "Error during connection: select for socket to be writable"); + // goto error; + // } else if (res == 0) { + // log_socket_error(TAG, sock, errno, "Connection timeout: select for socket to be writable"); + // goto error; + // } else { + // int sockerr; + // socklen_t len = (socklen_t)sizeof(int); +// + // if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (void*)(&sockerr), &len) < 0) { + // log_socket_error(TAG, sock, errno, "Error when getting socket error using getsockopt()"); + // goto error; + // } + // if (sockerr) { + // log_socket_error(TAG, sock, sockerr, "Connection error"); + // goto error; + // } + // } + // } else { + // log_socket_error(TAG, sock, errno, "Socket is unable to connect"); + // goto error; + // } + //} + ESP_LOGI(TAG, "Connected to %s:%s", CONFIG_TCP_SERVER_IP, CONFIG_TCP_SERVER_PORT); - // Keep receiving message - do { - // receive - int len = 0; - memset(rx_buffer, 0, sizeof(rx_buffer)); - do { - len = try_receive(TAG, sock, rx_buffer, sizeof(rx_buffer)); - if (len < 0) { - ESP_LOGE(TAG, "Error occurred during try_receive"); - goto error; - } - vTaskDelay(pdMS_TO_TICKS(YIELD_TO_ALL_MS)); - } while (len == 0); - ESP_LOGI(TAG, "Received message (len=%d)", len); + + // mpack + mpack_tree_t tree; + mpack_tree_init_stream(&tree, mp_fill, (void *)sock, CONFIG_IMAGE_BUF_SIZE, 30); - // execute command - char tx_buffer[128] = {0}; - int result = tcp_client_callback(rx_buffer, len); - switch (result) { - case 0: - strcpy(tx_buffer, "ok\n"); - break; - case -2: - strcpy(tx_buffer, "invalid request\n"); - break; - case -3: - strcpy(tx_buffer, "locked\n"); - break; - default: - strcpy(tx_buffer, "unknown error\n"); - break; - } + while (true) { + mpack_tree_parse(&tree); + if (mpack_tree_error(&tree) != mpack_ok) { + ESP_LOGE(TAG, "mperror: %d\n", mpack_tree_error(&tree)); + break; + } + + // parse + mpack_node_t root = mpack_tree_root(&tree); + char *cmd = mpack_node_cstr_alloc(mpack_node_map_cstr(root, "command"), 30); + + size_t index = 0; + size_t data_size = 0; + char *data = NULL; + if (strcmp(cmd, "update_image") == 0) { + index = mpack_node_u64(mpack_node_map_cstr(root, "index")); + + data_size = mpack_node_data_len(mpack_node_map_cstr(root, "data")); + data = (char *)mpack_node_data(mpack_node_map_cstr(root, "data")); + } + + // callback + request_t req; + memset(&req, 0, sizeof(req)); + req.command = cmd; + req.index = index; + req.data = data; + req.data_len = data_size; + + int result = tcp_client_callback(&req); + char tx_buffer[128] = {0}; + switch (result) { + case 0: + strcpy(tx_buffer, "ok\n"); + break; + case -2: + strcpy(tx_buffer, "invalid request\n"); + break; + case -3: + strcpy(tx_buffer, "locked\n"); + break; + default: + strcpy(tx_buffer, "unknown error\n"); + break; + } + + // send result + int send_len = socket_send(TAG, sock, tx_buffer, strlen(tx_buffer)); + if (send_len < 0) { + ESP_LOGE(TAG, "Error occurred during socket_send"); + goto error; + } + ESP_LOGI(TAG, "Written: %.*s", send_len, tx_buffer); + + // end + free(cmd); + cmd=NULL; + } - // send result - len = socket_send(TAG, sock, tx_buffer, strlen(tx_buffer)); - if (len < 0) { - ESP_LOGE(TAG, "Error occurred during socket_send"); - goto error; - } - ESP_LOGI(TAG, "Written: %.*s", len, tx_buffer); - } while (true); error: if (sock != INVALID_SOCK) { diff --git a/tcp_client2.py b/tcp_client2.py new file mode 100644 index 0000000..da7c9be --- /dev/null +++ b/tcp_client2.py @@ -0,0 +1,75 @@ +from typing import Dict, Any +import socket + +import msgpack + +import tcp_client_image + + +HOST = "0.0.0.0" +PORT = 3030 + +def send_request(client_socket:socket.socket, data:Dict[str, Any]): + try: + message:bytes = msgpack.packb(data) + + client_socket.sendall(message) + + rx = client_socket.recv(1024) + if data: + print(f"Received message: {rx.decode('utf-8').strip()}") + else: + print("Connection closed") + return + except ConnectionResetError: + print("Connection reset") + +def image(s:socket.socket): + image = b"" + try: + choice = int(input("image > ")) + if choice == 1: + image = tcp_client_image.image1 + elif choice == 2: + image = tcp_client_image.image2 + else: + raise ValueError + except: + print("invalid choice") + return + + chunk_size = 2048 + for i in range(0, len(image), chunk_size): + data = { + "command": "update_image", + "index": i, + "data": bytes(image[i:i+chunk_size]) + } + send_request(s, data) + + send_request(s, {"command":"push_image"}) + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((HOST, PORT)) + s.listen(5) + print("listening...") + + conn, addr = s.accept() # 阻塞 + with conn: + while True: + print("Command:") + print("(1) update + push image") + print("(2) ping") + print("(3) invalid request") + print("(4) exit") + + choice:int = int(input("> ")) + data = {} + if choice == 1: + image(conn) + elif choice == 2: + send_request(conn, {"command":"ping"}) + elif choice == 3: + send_request(conn, {"command":"invalid_command"}) + else: + break diff --git a/tcp_client_asyncio.py b/tcp_client_asyncio.py deleted file mode 100644 index a505001..0000000 --- a/tcp_client_asyncio.py +++ /dev/null @@ -1,28 +0,0 @@ -import asyncio -import signal - -async def handle_client(reader, writer): - try: - while True: - #data = await reader.readline() - #if not data: - # break -# - #msg = data.decode().strip() - #writer.write(f"echo: {msg}\n".encode()) - await writer.drain() - except Exception as e: - print("error:", e) - finally: - writer.close() - await writer.wait_closed() - -async def main(): - server = await asyncio.start_server( - handle_client, host="0.0.0.0", port=3030 - ) - - async with server: - await server.serve_forever() - -asyncio.run(main())