import asyncio import logging import grpc from backend import api from frontend.grpc.protobuf import igapi_pb2_grpc, igapi_pb2 from frontend.grpc.protobuf.igapi_pb2 import Request, Reply # configuration PORT = 50050 # logging grpclog = logging.getLogger("frontend.grpc") grpclog.setLevel(level=logging.INFO) # object # 考慮一下如果同時發起多的請求,asyncio可能會搞到被ban號(IG) class IGAPI_Server(igapi_pb2_grpc.IGAPIServicer): async def account_info(self, request: Request, context) -> Reply: grpclog.info("Request: account_info") account = api.IG_account_info() if account: result = { "username":account["username"], "full_name":account["full_name"], "email":account["email"] } return Reply(err=0, result=result) else: return Reply(err=1, result={"error":"api.IG_account_info returned None"}) async def login(self, request: Request, context) -> Reply: grpclog.info("Request: login") err = api.IG_login() if err: return Reply(err=1, result={"error":err}) return Reply(err=0, result={"result":"Login Successed"}) async def upload(self, request: Request, context) -> Reply: grpclog.info("Request: upload") aid = request.code res, err = api.upload(aid) if err: return Reply(err=1, result={"error":res}) return Reply(err=0, result={"result":res}) async def delete(self, request: Request, context) -> Reply: grpclog.info("Request: delete") aid = request.code res, err = api.delete(aid) if err: return Reply(err=1, result={"error":res}) return Reply(err=0, result={"result":res}) async def queue(self, request:Request, context) -> Reply: grpclog.info("Request: queue") reply = api.BACKEND_queue() return Reply(err=0, result=reply) async def setting(self, request:Request, context) -> Reply: # not done grpclog.info("Request: setting") return Reply(err=1, result={"error":"Not Done"}) # get igid with article id # start server async def serve() -> None: server = grpc.aio.server() igapi_pb2_grpc.add_IGAPIServicer_to_server( IGAPI_Server(), server ) server.add_insecure_port(f"[::]:{PORT}") await server.start() grpclog.info(f"gRPC Server listening on 0.0.0.0:{PORT}") await server.wait_for_termination() # entry point def main(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) asyncio.get_event_loop().run_until_complete(serve())