import asyncio import logging import grpc from backend import api from frontend.grpc.protobuf import igapi_pb2_grpc, igapi_pb2 from frontend.grpc.protobuf.igapi_pb2 import Request, Reply # configuration PORT = 50050 # logging grpclog = logging.getLogger("frontend.grpc") grpclog.setLevel(level=logging.INFO) # object # 考慮一下如果同時發起多的請求,asyncio可能會搞到被ban號(IG) class IGAPI_Server(igapi_pb2_grpc.IGAPIServicer): async def account_info(self, request: Request, context) -> Reply: grpclog.info("Request: account_info") account = api.IG_account_info() if account: result = { "username":account["username"], "full_name":account["full_name"], "email":account["email"] } return Reply(err=0, result=result) else: return Reply(err=1, result={"error":"api.IG_account_info returned None"}) async def login(self, request: Request, context) -> Reply: grpclog.info("Request: login") err = api.IG_login() if err: return Reply(err=1, result={"error":err}) return Reply(err=0, result={"result":"Login Successed"}) async def upload(self, request: Request, context) -> Reply: grpclog.info("Request: upload") aid = request.id res, err = api.upload(aid) if err: return Reply(err=1, result={"error":res}) return Reply(err=0, result={"result":res}) async def delete(self, request: Request, context) -> Reply: grpclog.info("Request: delete") aid = request.id res, err = api.delete(aid) if err: return Reply(err=1, result={"error":res}) return Reply(err=0, result={"result":res}) async def queue(self, request:Request, context) -> Reply: grpclog.info("Request: queue") reply = api.BACKEND_queue() return Reply(err=0, result=reply) #async def setting(self, request:Request, context) -> Reply: # # not done # grpclog.info("Request: setting") # return Reply(err=1, result={"error":"Not Done"}) # search 重作 async def search(self, request:Request, context) -> Reply: grpclog.info("Request: search") # search query_type = "" if len(request.igid) > 0: # if igid is exist, use it first reply = api.BACKEND_search(aid=None, igid=request.igid) query_type = "igid" else: reply = api.BACKEND_search(aid=request.id, igid=None) query_type = "id" # return if reply is None: return Reply(err=1, result={"error":"Not found"}) else: reply["id"] = str(reply["id"]) reply["query"] = query_type return Reply(err=0, result=reply) # start server async def serve() -> None: server = grpc.aio.server() igapi_pb2_grpc.add_IGAPIServicer_to_server( IGAPI_Server(), server ) server.add_insecure_port(f"[::]:{PORT}") await server.start() grpclog.info(f"gRPC Server listening on 0.0.0.0:{PORT}") await server.wait_for_termination() # entry point def main(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) asyncio.get_event_loop().run_until_complete(serve())