from typing import Dict, Any import socket import msgpack import tcp_client_image HOST = "0.0.0.0" PORT = 3030 def send_request(client_socket:socket.socket, data:Dict[str, Any]): try: message:bytes = msgpack.packb(data) client_socket.sendall(message) rx = client_socket.recv(1024) if data: print(f"Received message: {rx.decode('utf-8').strip()}") else: print("Connection closed") return except ConnectionResetError: print("Connection reset") def image(s:socket.socket): image = b"" try: choice = int(input("image > ")) if choice == 1: image = tcp_client_image.image1 elif choice == 2: image = tcp_client_image.image2 else: raise ValueError except: print("invalid choice") return chunk_size = 2048 for i in range(0, len(image), chunk_size): data = { "command": "update_image", "index": i, "data": bytes(image[i:i+chunk_size]) } send_request(s, data) send_request(s, {"command":"push_image"}) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen(5) print("listening...") conn, addr = s.accept() # 阻塞 with conn: while True: print("Command:") print("(1) update + push image") print("(2) ping") print("(3) invalid request") print("(4) exit") choice:int = int(input("> ")) data = {} if choice == 1: image(conn) elif choice == 2: send_request(conn, {"command":"ping"}) elif choice == 3: send_request(conn, {"command":"invalid_command"}) else: break