#include "meshtalos.h" #include "./mpack.h" #include "./picohttpparser.h" #include "lwip/inet.h" #include "lwip/sockets.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #define STB_DS_IMPLEMENTATION #include "./stb_ds.h" #define HTTPVER "HTTP/1.1" #define IMG_W_SIZE 152 #define IMG_H_SIZE 296 #define DISPLAY_SIZE (IMG_W_SIZE * IMG_H_SIZE / 8) #define IMG_HEADER_SIZE 11 #define IMG_SIZE IMG_HEADER_SIZE + DISPLAY_SIZE struct client { int fd; struct sockaddr_in ip4; bool invalid; }; struct conn { int rootfd; fd_set master, recv; struct client *c; int max_client_fd; struct timeval interval; }; struct http_param { const char *method, *path; struct phr_header *headers; size_t method_len, path_len, headers_len, body_len; int minor_ver; char *body_start; }; struct conn workers, http; char iobuf[2048] = {0}; char httpbuf[2048] = {0}; char imgbuf[IMG_SIZE]; struct phr_header headers[30]; const size_t header_nums = 30; void tsend(); int natoi(const char *a, int len) { int ret = 0; for (int i = 0; i < len; i++) { if (a[i] >= '0' && a[i] <= '9') { ret = ret * 10 + a[i] - '0'; } else { return 0; } } return ret; } ssize_t client_send(struct client *cli, const char *buf, size_t len) { if (cli->invalid) { return -1; } printf("client written: %d\n", len); int ret = send(cli->fd, buf, len, MSG_NOSIGNAL); if (ret < 0) { cli->invalid = true; } return ret; } ssize_t client_recv(struct client *cli, char *buf, size_t len) { if (cli->invalid) { return -1; } int ret = recv(cli->fd, buf, len, MSG_NOSIGNAL); if (ret <= 0) { cli->invalid = true; } return ret; } ssize_t client_dprintf(struct client *cli, const char *fmt, ...) { va_list v; va_start(v, fmt); size_t l = vsnprintf(httpbuf, sizeof(httpbuf), fmt, v); va_end(v); return client_send(cli, httpbuf, l); } void conn_init(struct conn *c) { FD_ZERO(&c->recv); arrsetcap(c->c, 10); } void conn_poll_accept(struct conn *c) { while (1) { struct client temp; socklen_t tlen = sizeof(temp.ip4); temp.fd = accept(c->rootfd, (struct sockaddr *)&temp.ip4, &tlen); if (temp.fd != -1) { c->max_client_fd = c->max_client_fd < temp.fd ? temp.fd : c->max_client_fd; FD_SET(temp.fd, &c->master); printf("poll accepted: %s\n", inet_ntoa(temp.ip4.sin_addr)); temp.invalid = false; arrput(c->c, temp); } else { if (errno != EAGAIN || errno != EWOULDBLOCK) { perror("accept error"); } break; } } } void conn_poll_clients(struct conn *c, void (*cb)(struct conn *c, struct client *client, int client_id)) { memcpy(&c->recv, &c->master, sizeof(c->master)); int conn = select(c->max_client_fd + 1, &c->recv, NULL, NULL, &c->interval); if (conn < 0) { perror("select error"); } for (int i = 0; i < arrlen(c->c) && conn; ++i) { if (FD_ISSET(c->c[i].fd, &c->recv)) { cb(c, &c->c[i], i); conn--; } } } void conn_collect_bad_client(struct conn *c) { for (int i = 0; i < arrlen(c->c); i++) { if (c->c[i].invalid) { FD_CLR(c->c[i].fd, &c->master); close(c->c[i].fd); arrdelswap(c->c, i); } } } int app_close = 0; /* http path handle */ #define CLRF "\r\n" #define HEAD_MATCH(h, g) \ (strlen(h) == g.name_len && strncmp(h, g.name, g.name_len) == 0) #define HEAD_VAL_MATCH(h, g) \ (strlen(h) == g.value_len && strncmp(h, g.value, g.value_len) == 0) int badreq(struct client *c) { client_dprintf(c, HTTPVER " 400 Bad Request" CLRF "Connection: close" CLRF CLRF); return -1; } int fbadreq(struct client *c, const char *s, ...) { va_list p, tp; va_start(p, s); va_copy(tp, p); size_t body_len = vsnprintf(NULL, 0, s, tp); client_dprintf(c, HTTPVER " 400 Bad Request" CLRF "Connection: close" CLRF "Content-Length: %ld" CLRF CLRF, body_len); vsnprintf(httpbuf, sizeof(httpbuf), s, p); client_send(c, httpbuf, body_len); va_end(p); return -1; } int api_tag_upload(struct client *c, struct http_param *p) { int acc = 0; for (int i = 0; i < p->headers_len; i++) { if (HEAD_MATCH("Content-Type", p->headers[i]) && HEAD_VAL_MATCH("image/x-portable-bitmap", p->headers[i])) { acc++; } if (HEAD_MATCH("Content-Length", p->headers[i])) { if (strtol(p->headers[i].value, NULL, 10) == IMG_SIZE) { acc++; } } } if (acc != 2) { return fbadreq(c, "Only accept request with PGM P4 format that height and " "width is either %03d,%03d", IMG_W_SIZE, IMG_H_SIZE); } size_t upload_size = p->body_len; memcpy(imgbuf, p->body_start, p->body_len); printf("tag read %d\n", p->body_len); while (upload_size < IMG_SIZE) { // upload PGM sanitize if (upload_size > IMG_HEADER_SIZE) { if (strncmp(imgbuf, "P4", 2) != 0) { return badreq(c); } int santinize_acc = 0; int width = 0, height = IMG_W_SIZE ^ IMG_H_SIZE; int p_start = 3, p_len = 0; for (int i = 3; i < upload_size; i++) { if (imgbuf[i] == ' ' || imgbuf[i] == '\r' || imgbuf[i] == '\n') { p_len = i - p_start; if (santinize_acc == 0) { width = natoi(&imgbuf[p_start], p_len); if (width != IMG_W_SIZE && width != IMG_H_SIZE) { return badreq(c); } height ^= width; } else if (santinize_acc == 1) { if (height != natoi(&imgbuf[p_start], p_len)) { return badreq(c); } } p_start = i + 1; p_len = 0; santinize_acc += 1; } } } // read MIN(iobuf, left_to_read) size_t next_size = IMG_SIZE - upload_size > sizeof(httpbuf) ? sizeof(httpbuf) : IMG_SIZE - upload_size; ssize_t rcv_size = client_recv(c, httpbuf, next_size); if (rcv_size != next_size) { return fbadreq(c, "size mistmatch"); } memcpy(imgbuf + upload_size, httpbuf, rcv_size); printf("tag read %d\n", p->body_len); upload_size += next_size; } client_dprintf(c, HTTPVER " 200 Good" CLRF "Connection: close" CLRF CLRF); tsend(); return -1; } int nopath(struct client *c, struct http_param *p) { int bodylen = snprintf(NULL, 0, "No Path: %.*s", p->path_len, p->path); client_dprintf(c, HTTPVER " 404 Not Found" CLRF "Connection: close" CLRF "Content-Length: %d" CLRF "Content-Type: text/plain" CLRF CLRF, bodylen); client_dprintf(c, "No Path: %.*s", p->path_len, p->path); return -1; } /* http path handle */ void http_client_handle(struct conn *c, struct client *client, int client_id) { printf("handle client\n"); ssize_t rcv_len; size_t buflen = 0; while ((rcv_len = read(client->fd, iobuf + buflen, sizeof(iobuf) - buflen)) == -1 && errno == EINTR) ; if (rcv_len < 0) { perror("failed to recv\n"); return; } else if (rcv_len == 0) { printf("closing request\n"); client->invalid = true; return; } else { printf("read %d bytes\n", rcv_len); } struct http_param p; p.headers_len = header_nums; p.headers = headers; int pret = phr_parse_request(iobuf, rcv_len, &p.method, &p.method_len, &p.path, &p.path_len, &p.minor_ver, p.headers, &p.headers_len, 0); if (pret > 0) { p.body_start = memmem(iobuf, rcv_len, "\r\n\r\n", 4); if (p.body_start - iobuf + 1 >= rcv_len) { p.body_start = NULL; } else { p.body_start += 4; p.body_len = rcv_len - (p.body_start - iobuf); } printf("PATH: \"%.*s\"\n", (int)p.path_len, p.path); int handler_ret = 0; #define PEQ(g) (strlen(g) <= p.path_len && strncmp(g, p.path, strlen(g)) == 0) if (PEQ("/api/tag") && p.method_len == 3 && strncmp("PUT", p.method, p.method_len) == 0) { handler_ret = api_tag_upload(client, &p); } else { handler_ret = nopath(client, &p); } if (handler_ret == -1) { client->invalid = true; } } else { printf("can't handle request\n"); badreq(client); client->invalid = true; } } static inline int get_bit(const uint8_t *buf, int w, int x, int y) { int i = y * w + x; return (buf[i >> 3] >> (7 - (i & 7))) & 1; } static inline void set_bit(uint8_t *buf, int w, int x, int y, int v) { int i = y * w + x; uint8_t m = 1 << (7 - (i & 7)); if (v) buf[i >> 3] |= m; else buf[i >> 3] &= ~m; } /** * @brief In-place 90-degree clockwise rotation for 1-bit-per-pixel images. * @param data The image buffer * @param W Original width * @param H Original height */ void rotate90_inline(uint8_t *data, int W, int H) { static uint8_t temp[(IMG_H_SIZE * IMG_W_SIZE + 7) / 8] = {0}; int new_w = H; int new_h = W; for (int y = 0; y < H; y++) { for (int x = 0; x < W; x++) { int v = get_bit(data, W, x, y); // (x, y) → (h-1-y, x) // set_bit(temp, new_w, H - 1 - y, x, v); set_bit(temp, new_w, y, x, v); } } memcpy(data, temp, DISPLAY_SIZE); } #define IMG_BLK_SIZE 300 void flush_mpack(mpack_writer_t *wr, const char *b, size_t len) { struct client *c = mpack_writer_context(wr); printf("written %d bytes\n", len); if (client_send(c, b, len) <= 0) { mpack_writer_flag_error(wr, mpack_error_io); } } void send_image(struct client *c) { mpack_writer_t wr; mpack_writer_init(&wr, iobuf, sizeof(iobuf)); mpack_writer_set_flush(&wr, flush_mpack); mpack_writer_set_context(&wr, c); for (uint64_t i = IMG_HEADER_SIZE; i < IMG_SIZE; i += IMG_BLK_SIZE) { mpack_start_map(&wr, 3); mpack_write_cstr(&wr, "command"); mpack_write_cstr(&wr, "update_image"); mpack_write_cstr(&wr, "index"); mpack_write_u64(&wr, i - IMG_HEADER_SIZE); mpack_write_cstr(&wr, "data"); mpack_write_bin(&wr, &imgbuf[i], MIN(IMG_BLK_SIZE, IMG_SIZE - i)); mpack_finish_map(&wr); mpack_writer_flush_message(&wr); } mpack_start_map(&wr, 1); mpack_write_cstr(&wr, "command"); mpack_write_cstr(&wr, "push_image"); mpack_writer_destroy(&wr); } void tsend() { int param_acc = 0, param_start = 0, param_len = 0; int img_w = 0, img_h = 0; for (int i = 0; i < sizeof(imgbuf); i++) { if (imgbuf[i] == ' ' || imgbuf[i] == '\r' || imgbuf[i] == '\n') { param_len = i - param_start; switch (param_acc) { case 1: img_w = natoi(&imgbuf[param_start], param_len); break; case 2: img_h = natoi(&imgbuf[param_start], param_len); break; } param_start = i + 1; param_acc += 1; } } printf("img_w: %d, img_h: %d\n", img_w, img_h); if (img_w == IMG_H_SIZE) { printf("rotate\n"); rotate90_inline((uint8_t *)imgbuf + IMG_HEADER_SIZE, IMG_H_SIZE, IMG_W_SIZE); } for (int i = 0; i < arrlen(workers.c); i++) { send_image(workers.c + i); } } int main_listen(const int worker_port, int http_port) { // workers int sfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in b = {0}; memset(&b, 0, sizeof(struct sockaddr_in)); b.sin_port = htons(worker_port); b.sin_family = AF_INET; b.sin_addr.s_addr = htonl(INADDR_ANY); int val = 1; assert(setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1); assert(bind(sfd, (struct sockaddr *)&b, sizeof(b)) != -1); assert(listen(sfd, 5) != -1); printf("start listen\n"); int flags = fcntl(sfd, F_GETFL); assert(fcntl(sfd, F_SETFL, flags | O_NONBLOCK) != -1); // HTTP int httpfd = socket(AF_INET, SOCK_STREAM, 0); memset(&b, 0, sizeof(struct sockaddr_in)); b.sin_port = htons(http_port); b.sin_family = AF_INET; b.sin_addr.s_addr = htonl(INADDR_ANY); val = 1; assert(setsockopt(httpfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) != -1); assert(bind(httpfd, (struct sockaddr *)&b, sizeof(b)) != -1); assert(listen(httpfd, 5) != -1); printf("start listen\n"); flags = fcntl(httpfd, F_GETFL); assert(fcntl(httpfd, F_SETFL, flags | O_NONBLOCK) != -1); workers.interval.tv_usec = 300; workers.rootfd = sfd; conn_init(&workers); http.interval.tv_usec = 300; http.rootfd = httpfd; conn_init(&http); // event loop while (!app_close) { conn_poll_accept(&workers); conn_poll_accept(&http); conn_poll_clients(&http, &http_client_handle); conn_collect_bad_client(&workers); conn_collect_bad_client(&http); // printf("iter\n"); // usleep(500000); } return 0; } void mtsh_listen(struct Meshtalos *mtsh, const int worker_port, int http_port) { main_listen(worker_port, http_port); } struct Meshtalos *mtsh_init() { struct Meshtalos *m = calloc(sizeof(struct Meshtalos), 1); return m; }