niming_backend/utils/ighelper.py

44 lines
1.3 KiB
Python
Raw Normal View History

2024-12-18 03:07:56 +08:00
from typing import Tuple
import os
import grpc
from protobuf_files import igapi_pb2_grpc
from protobuf_files.igapi_pb2 import Request, Reply
IGAPI_HOST = os.getenv("IGAPI_HOST", None)
def request_account_info() -> Tuple[dict, int]:
with grpc.insecure_channel(IGAPI_HOST) as channel:
stub = igapi_pb2_grpc.IGAPIStub(channel)
res = stub.account_info(Request())
return dict(res.result.items()), res.err
def request_login() -> Tuple[dict, int]:
with grpc.insecure_channel(IGAPI_HOST) as channel:
stub = igapi_pb2_grpc.IGAPIStub(channel)
res = stub.login(Request())
return dict(res.result.items()), res.err
def request_queue() -> dict:
with grpc.insecure_channel(IGAPI_HOST) as channel:
stub = igapi_pb2_grpc.IGAPIStub(channel)
res = stub.queue(Request())
return dict(res.result.items())
def request_upload(aid:int) -> Tuple[dict, int]:
with grpc.insecure_channel(IGAPI_HOST) as channel:
stub = igapi_pb2_grpc.IGAPIStub(channel)
res = stub.upload(Request(code=aid))
return dict(res.result.items()), res.err
def request_delete(aid:int, code:str) -> Tuple[dict, int]:
with grpc.insecure_channel(IGAPI_HOST) as channel:
stub = igapi_pb2_grpc.IGAPIStub(channel)
res = stub.delete(Request(code=aid, args=[code]))
return dict(res.result.items()), res.err