diff --git a/app.py b/app.py index b6da20a..66984bf 100644 --- a/app.py +++ b/app.py @@ -1,42 +1,37 @@ -import os, sys +import os +import sys +import asyncio +import threading from sqlalchemy import create_engine from instagrapi import Client # from dotenv import load_dotenv -from utils import shareclass from ig import IG -from db import pgclass -from grpc import grpcServer +from db import dbhelper +from db.pgclass import Base +from grpcServer import grpcServer, anoth from utils.const import DEBUG # load_dotenv() +if DEBUG: + print("[*] ===== DEBUG MODE =====") + # Database PG_HOST = os.environ.get("PG_HOST", None).strip() -PG_PORT = os.environ.get("PG_PORT", None).strip() -PG_NAME = os.environ.get("PG_NAME", None).strip() -PG_USER = os.environ.get("PG_USER", None).strip() -PG_PASS = os.environ.get("PG_PASS", None).strip() -engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME)) -pgclass.Base.metadata.create_all(engine) -print("[V] Database Connected") +print("[*] Connecting to Database") +dbhelper.db = dbhelper.DB(create_engine(PG_HOST)) +Base.metadata.create_all(dbhelper.db._engine) # IG Login -cl = Client() -shareclass.Shared(cl, engine) # Shared Class +IG.init(Client()) if not DEBUG and not IG.login(): sys.exit(0) -# grpc server should have... -# - Get account info (a kind of checkalive) -# - Get media info (a kind of checkalive) -# - Upload media (預設客戶端給我id) -# - Delete media -# - Login - -# IG統一保存code - # run grpc if __name__ == "__main__": - grpcServer.serve() + # upload / delete processor + threading.Thread(target=anoth.run).start() + # grpc main + asyncio.get_event_loop().run_until_complete(grpcServer.serve()) diff --git a/db/DBHelper.py b/db/DBHelper.py deleted file mode 100644 index d466a8f..0000000 --- a/db/DBHelper.py +++ /dev/null @@ -1,61 +0,0 @@ -from typing import Tuple, Dict - -from db import pgclass -from utils import shareclass - -# 獲取單一文章 -def solo_article_fetcher(key:int) -> Dict | None: - table = pgclass.SQLarticle - ftab = pgclass.SQLfile - resfn = {} - - with shareclass.Shared.db_get_session() as session: - # query - res = session.query(table).filter(table.id == key, table.mark == "visible").first() - if res is None: return None - - # mapping - resfn.update({"id": res.id, - "ctx": res.ctx, - "igid": res.igid, - "mark": res.mark, - "reference": res.reference, - "hash": res.hash, - "created_at": res.created_at, - "ip": res.ip - }) - - # file - resfn["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == res.hash).all() ] - - return resfn - -# 獲取檔案 -def solo_file_fetcher(id:int) -> Dict | None: - table = pgclass.SQLarticle - ftab = pgclass.SQLfile - - with shareclass.Shared.db_get_session() as session: - fres = session.query(ftab).filter(ftab.id == id).first() - if fres is None: # 檢查檔案是否存在 - return None - - article = session.query(table).filter(table.hash == fres.reference, table.mark == 'visible').first() - if article is None: # 檢查文章本體是否存在/可以閱覽 - return None - - # mapping - resfn = { - "type": fres.type, - "binary": fres.binary - } - - return resfn - -# 寫入IG狀態 -def solo_article_updater(id:int, code:str): - table = pgclass.SQLarticle - with shareclass.Shared.db_get_session() as session: - res = session.query(table).filter(table.id == id).first() - res.igid = code - session.commit() \ No newline at end of file diff --git a/db/dbhelper.py b/db/dbhelper.py new file mode 100644 index 0000000..841c5a3 --- /dev/null +++ b/db/dbhelper.py @@ -0,0 +1,152 @@ +from typing import Tuple, Dict + +from sqlalchemy.orm import sessionmaker +from sqlalchemy import Engine, text, update + +from s3 import s3helper +from db import pgclass + +class DB: + _engine = None + + @classmethod + def __init__(cls, engine): + cls._engine:Engine = engine + + @classmethod + def getsession(cls): + Session = sessionmaker(bind=cls._engine) + return Session() + +db:DB = None + + +# role (general) (owner) (admin) +# 獲取單一文章 +def solo_article_fetcher(role:str, key:int, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general + with db.getsession() as session: + # article fetch + stmt="SELECT posts.id, posts.content, posts.file_list, meta.igid, posts.hash, meta.ip, pmark.mark " \ + +"FROM posts " \ + +"INNER JOIN mark AS pmark ON posts.hash=pmark.hash " \ + +"INNER JOIN article_meta AS meta ON posts.hash=meta.hash " + if role == "owner": # 驗證id/hash,可以看到本體(無驗證) + stmt += "WHERE posts.id = :id AND posts.hash = :hash" + elif role == "admin": # 驗證id,可以看到本體(無驗證) + stmt += "WHERE posts.id = :id" + elif role == "general": # 驗證id,可以看到本體(visible) + stmt += "WHERE posts.id=:id AND pmark.mark='visible'" + result = session.execute(text(stmt), {"id":key, "hash":hash}) + res = result.first() + if res is None: + return {}, 404 + + # comment fetch + stmt="SELECT c.sha1 " \ + +"FROM posts " \ + +"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \ + +"INNER JOIN mark AS cmark ON c.hash=cmark.hash " \ + +"WHERE posts.id=:id" + if role == "general": # 留言sha1(visible) + stmt+=" AND cmark.mark='visible'" + result = session.execute(text(stmt), {"id":res[0]}) + cres = result.all() + + # mapping + one = { + "id": res[0], + "content": res[1], + "igid": res[3], + } + if res[2]: # files + one["files_hash"] = res[2] + if res[4]: # comments + one["comments_hash"] = [ c[0] for c in cres ] + + if role == "admin": + one["ip"] = res[5] + one["mark"] = res[6] + one["hash"] = res[4] + + return one, 200 + + +# role (general) (owner) (admin) +# 獲取單一留言 - 可能不會用到 +def solo_comment_fetcher(role:str, key:str, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general + with db.getsession() as session: + # query + stmt="SELECT posts.id AS parent_id, posts.hash AS parent_hash, pmark.mark AS parent_mark, cmark.mark AS comment_mark, c.* " \ + +"FROM posts " \ + +"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \ + +"JOIN mark AS pmark ON posts.hash=pmark.hash " \ + +"JOIN mark AS cmark ON c.hash=cmark.hash " \ + +"WHERE c.sha1=:sha1 " + if role == "general": + # 對一般用戶,sha1查詢,確保本體跟留言可見 + stmt += "AND pmark.mark='visible' AND cmark.mark='visible'" + arta = session.execute(text(stmt), {'sha1':key}).first() + elif role == "owner": + # 對發文者,sha1查詢,sha256查詢,不設檢查 + stmt += "AND c.hash=:hash" + arta = session.execute(text(stmt), {'sha1':key, 'hash':hash}).first() + elif role == "admin": + # 對管理員,sha1查詢,不設檢查 + arta = session.execute(text(stmt), {'sha1':key}).first() + if arta is None: + return {}, 404 + + # mapping + one = { + "content": arta[4], + "sha1": arta[8] + } + + if role == "admin": + one["ip"] = arta[5] + one["mark"] = arta[3] + one["hash"] = arta[6] + + return one, 200 + + +# 獲取檔案 - for IG +def solo_file_fetcher(role:str, fnhash:str) -> Tuple[dict, int]: # general, admin + with db.getsession() as session: + arta="SELECT posts.id FROM posts " \ + +"INNER JOIN mark ON posts.hash=mark.hash " \ + +"WHERE :fnhash=ANY (posts.file_list) " + if role == "general": + arta += "AND mark.mark = 'visible'" + arta = session.execute(text(arta), {'fnhash':fnhash}).first() + if arta is None: # 檢查文章本體是否存在/可以閱覽 + return {}, 404 + + # fetch file + f, err = s3helper.solo_file_fetcher(fnhash) + if err: + return {}, 404 + return f, 200 + + +# 填入 igid +def solo_article_set_igid(id:int, igid:str) -> int: + # get hash + article, code = solo_article_fetcher(role="admin", key=id) + if code != 200: + return 1 + hash = article["hash"] + # print(hash) + + # edit igid + err = 0 + article_meta = pgclass.SQLmeta + with db.getsession() as session: + try: + stmt = update(article_meta).where(article_meta.hash==hash).values(igid=igid) + session.execute(stmt) + except Exception as e: + print(e) + err = 1 + session.commit() + return err diff --git a/db/pgclass.py b/db/pgclass.py index 59f5883..740a113 100644 --- a/db/pgclass.py +++ b/db/pgclass.py @@ -1,23 +1,54 @@ -from sqlalchemy import Column, String, TIMESTAMP, func, BIGINT, LargeBinary, ARRAY +from sqlalchemy import Column, String, TIMESTAMP, func, BIGINT, ARRAY from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy_utils.types.pg_composite import CompositeType +from sqlalchemy.ext.mutable import MutableList Base = declarative_base() +CompositeType.cache_ok = False + +comment_type = CompositeType( + 'comment', + [ + Column('content', String), + Column('ip', String), + Column('hash', String), + Column('created_at', TIMESTAMP), + Column("sha1", String) + ] +) + + +# post class SQLarticle(Base): __tablename__ = 'posts' id = Column(BIGINT, primary_key=True) - created_at = Column(TIMESTAMP(timezone=True), server_default=func.now()) + content = Column(String) + file_list = Column(ARRAY(String)) hash = Column(String) - ctx = Column(String) + comment_list = Column(MutableList.as_mutable(ARRAY(comment_type))) + + +# post metadata +class SQLmeta(Base): + __tablename__ = 'article_meta' + + hash = Column(String, primary_key=True) + created_at = Column(TIMESTAMP(timezone=True), server_default=func.now()) igid = Column(String) - mark = Column(String) ip = Column(String) - reference = Column(BIGINT) - def __repr__(self): - return f"" +# post mark +class SQLmark(Base): + __tablename__ = 'mark' + + hash = Column(String, primary_key=True) + mark = Column(String) + + +# logs class SQLlog(Base): __tablename__ = 'logs' @@ -26,21 +57,8 @@ class SQLlog(Base): message = Column(String) source = Column(String) - def __repr__(self): - return f"" - -class SQLfile(Base): - __tablename__ = 'files' - - id = Column(BIGINT, primary_key=True) - created_at = Column(TIMESTAMP(timezone=True), server_default=func.now()) - type = Column(String) - reference = Column(String) - binary = Column(LargeBinary) - - def __repr__(self): - return f"" +# user class SQLuser(Base): __tablename__ = 'users' @@ -48,6 +66,3 @@ class SQLuser(Base): user = Column(String) password = Column(String) # hash , sha512 permission = Column(ARRAY(String)) - - def __repr__(self): - return f"" \ No newline at end of file diff --git a/grpc/grpcServer.py b/grpc/grpcServer.py deleted file mode 100644 index 74323b3..0000000 --- a/grpc/grpcServer.py +++ /dev/null @@ -1,29 +0,0 @@ -from grpc.postProcessor import upload, remove - - -def serve(): - pass - -""" # for testing -def _serve(): - print(IG.account_info()) - - pass - - return - aid = 57 - - msg, err = upload(aid) - if err: - print(msg) - return - - input("Press any key...") - - DBHelper.solo_article_updater(id=aid, code=msg) - - msg, err = remove(aid) - if err: - print(msg) - return -""" \ No newline at end of file diff --git a/grpc/postProcessor.py b/grpc/postProcessor.py deleted file mode 100644 index e7fa168..0000000 --- a/grpc/postProcessor.py +++ /dev/null @@ -1,62 +0,0 @@ -from typing import Tuple -import os - -from ig import ctxPictuterProma, IG -from db import DBHelper -from utils import fileProcessor -from utils.const import DEBUG - -# returns (errmsg | code, errcode) -def upload(aid:int) -> Tuple[str, int]: - # 抓取文章本體 - article = DBHelper.solo_article_fetcher(key = aid) - if article is None: - return "Post not found", 1 - - # 抓取檔案 - files = [ - DBHelper.solo_file_fetcher(id = k) - for k in article["files"] - ] - if None in files: - return "File not found", 1 - - # 轉出暫存檔案 - tmp_path:list = [] - for t in files: - filename, err = fileProcessor.file_saver(t.get("type"), t.get("binary")) - if err: # 如果錯誤 - return filename, 1 - tmp_path.append(filename) - - # 合成文字圖 - proma_file = ctxPictuterProma.new_proma(article["ctx"]) - tmp_path = [proma_file] + tmp_path - - # 送交 IG 上傳 - if not DEBUG: - media = IG.upload_media(article["ctx"], tmp_path) - if media is None: - return "Upload failed", 1 - else: - media = {"code":"fake_data"} - - # 刪除檔案 - for t in tmp_path: - os.remove(t) - - return media["code"], 0 - - -# return (errmsg, code) -def remove(aid:int) -> Tuple[str, int]: - # 抓取文章本體 - article = DBHelper.solo_article_fetcher(key = aid) - if article is None: - return "Post not found", 1 - - err = IG.delete_media(article["igid"]) - if err: - return "Remove failed", 1 - - return "OK", 0 \ No newline at end of file diff --git a/grpc/protobuf/note.txt b/grpc/protobuf/note.txt deleted file mode 100644 index 1a0366a..0000000 --- a/grpc/protobuf/note.txt +++ /dev/null @@ -1,3 +0,0 @@ -Response: - code: int - message: str \ No newline at end of file diff --git a/grpcServer/anoth.py b/grpcServer/anoth.py new file mode 100644 index 0000000..d9087cf --- /dev/null +++ b/grpcServer/anoth.py @@ -0,0 +1,47 @@ +import time +import random + +from grpcServer import postProcessor +from utils.ThreadSafeOrderedDict import ThreadSafeOrderedDict +from utils.const import ANOTH_INTERVAL_MIN, ANOTH_INTERVAL_MAX +from db import dbhelper + +task = ThreadSafeOrderedDict() + +def task_round(): + t = task.popitem(last=False) + if not t: # 沒任務 + print("[*] No task in queue") + return + + aid = t[1]["aid"] + type = t[0].split("-")[0] + print("[*] Task %s(target_aid=%d)"%(type, aid)) + + if type == "upload": # upload + msg, err = postProcessor.upload(aid) + elif type == "delete": + code = t[1]["code"] + msg, err = postProcessor.remove(code) + else: + msg, err = "Invalid task type %s"%type, 1 + + if err: + print("[X] Task failed: %s"%msg) + elif type == "upload": + dberr = dbhelper.solo_article_set_igid(id=aid, igid=msg) + if dberr: + print("[X] Task %s(target_aid=%d): Set igid failed"%(type, aid)) + + print("[*] Task Done") + return + + +def run(): + print("[*] Upload/Delete Processor Started") + while True: + task_round() + + sleep = random.randint(ANOTH_INTERVAL_MIN, ANOTH_INTERVAL_MAX) + print("[*] Next Round After %ds"%sleep) + time.sleep(sleep) diff --git a/grpcServer/grpcServer.py b/grpcServer/grpcServer.py new file mode 100644 index 0000000..1f519ac --- /dev/null +++ b/grpcServer/grpcServer.py @@ -0,0 +1,132 @@ +# from concurrent import futures + +import grpc +from cachetools import cached, TTLCache + +from ig import IG +from db import dbhelper +from utils.const import GRPC_ACCINFO_CACHE, GRPC_RELOGIN_LIMIT +from grpcServer import anoth +from grpcServer.protobuf import igapi_pb2_grpc +from grpcServer.protobuf.igapi_pb2 import Request, Reply + +# call account info / login +cache_accinfo = TTLCache(maxsize=1, ttl=GRPC_ACCINFO_CACHE) +@cached(cache_accinfo) +def call_IG_account_info(): + result = IG.account_info() + return result + +cache_login = TTLCache(maxsize=1, ttl=GRPC_RELOGIN_LIMIT) +@cached(cache_login) +def call_IG_login(): + result = IG.login() + return result + + +# object +# 考慮一下如果同時發起多的請求,asyncio可能會搞到被ban號(IG) +class IGAPI_Server(igapi_pb2_grpc.IGAPIServicer): + async def account_info(self, request: Request, context) -> Reply: + print("[*] Request: account_info") + account = call_IG_account_info() + if account: + result = { + "username":account["username"], + "full_name":account["full_name"], + "email":account["email"] + } + return Reply(err=0, result=result) + else: + return Reply(err=1, result={"error":"IG.account_info returned None"}) + + + async def login(self, request: Request, context) -> Reply: + print("[*] Request: login") + if len(cache_login): # cache has not expired + print("[*] Login: Cooldown") + return Reply(err=1, result={"error":"Cooldown"}) + else: + login = call_IG_login() + if login: + return Reply(err=0, result={"result":"Login Successed"}) + else: + return Reply(err=1, result={"error":"Login Failed"}) + + + async def upload(self, request: Request, context) -> Reply: + print("[*] Request: upload") + aid = request.code + + # 檢查 - 可見 + article, code = dbhelper.solo_article_fetcher(role="general", key=aid) # visible -> post + if code != 200: + return Reply(err=1, result={"error":"Post not found"}) + + # 檢查 - 已經在 Queue 內 + if anoth.task["upload-"+str(aid)]: + return Reply(err=1, result={"error":"Request is already in queue"}) + + # 檢查 - Queue 內有請求刪除同目標 + if anoth.task["delete-"+str(aid)]: + anoth.task.pop("delete-"+str(aid)) + return Reply(err=0, result={"result":"Canceled delete post request"}) + + # 檢查 - 已經上傳過 + if article["igid"]: + return Reply(err=1, result={"error":"Already Posted"}) + + # put into queue + anoth.task["upload-"+str(aid)] = {"aid":aid} + + return Reply(err=0, result={"result":"Put into queue"}) + + + async def delete(self, request: Request, context) -> Reply: + print("[*] Request: delete") + # article id + aid = request.code + # igid from args + if request.args: + igid = request.args[0] + else: + return Reply(err=1, result={"error":"Invalid Arguments"}) + + # 檢查 - 已經在 Queue 內 + if anoth.task["delete-"+str(aid)]: + return Reply(err=1, result={"error":"Request is already in queue"}) + + # 檢查 - Queue 內有請求上傳同目標 + if anoth.task["upload-"+str(aid)]: + anoth.task.pop("upload-"+str(aid)) + return Reply(err=0, result={"result":"Canceled upload post request"}) + + # put into queue + anoth.task["delete-"+str(aid)] = {"aid":aid, "code":igid} + + return Reply(err=0, result={"result":"Put into queue"}) + + + async def queue(self, request:Request, context) -> Reply: + print("[*] Request: queue") + t = anoth.task.items() + reply = { _[0]:str(_[1]["aid"]) for _ in t } + return Reply(err=0, result=reply) + + + async def setting(self, request:Request, context) -> Reply: + # not done + print("[*] Request: setting") + return Reply(err=1, result={"error":"Not Done"}) + + +# start server +async def serve() -> None: + server = grpc.aio.server() + igapi_pb2_grpc.add_IGAPIServicer_to_server( + IGAPI_Server(), server + ) + server.add_insecure_port("[::]:50051") + await server.start() + print("[*] gRPC Server listening on 0.0.0.0:50051") + await server.wait_for_termination() diff --git a/grpcServer/postProcessor.py b/grpcServer/postProcessor.py new file mode 100644 index 0000000..fb214fa --- /dev/null +++ b/grpcServer/postProcessor.py @@ -0,0 +1,64 @@ +from typing import Tuple +import os + +from ig import contentPictuterProma, IG +from db import dbhelper +from utils import fileProcessor +from utils.const import DEBUG +from s3 import s3helper + +# return (errmsg | code, errcode) +def upload(aid:int) -> Tuple[str, int]: + # 抓取文章本體 + article, code = dbhelper.solo_article_fetcher(role="general", key=aid) # visible -> post + if code != 200: + return "Post not found", 1 + + # 抓取檔案 + files = [] + for k in article["files_hash"]: + f, code = s3helper.solo_file_fetcher(fnhash=k) + if code: + return "File not found", 1 + else: + files.append(f) + + # 轉出暫存檔案 + tmp_path:list = [] + for t in files: + filename, err = fileProcessor.file_saver(t.get("mime"), t.get("binary")) + if err: # 如果錯誤 + return filename, 1 + tmp_path.append(filename) + + # 合成文字圖 + proma_file = contentPictuterProma.new_proma(article["content"]) + tmp_path = [proma_file] + tmp_path + + # 送交 IG 上傳 + if not DEBUG: + media = IG.upload_media(article["content"], tmp_path) + if media is None: + return "Upload failed", 1 + else: + media = {"code":"fake_data"} + + # 刪除檔案 + for t in tmp_path: + os.remove(t) + + return media["code"], 0 + + +# return (errmsg, code) +def remove(code:str) -> Tuple[str, int]: + # 抓取文章本體 - 叫你刪除的時候可能已經找不到本體了 + # article, code = dbhelper.solo_article_fetcher(role="general", key=aid) + # if code != 200: + # return "Post not found", 1 + + err = IG.delete_media(code) + if err: + return "Remove failed", 1 + + return "OK", 0 diff --git a/grpc/protobuf/igapi.proto b/grpcServer/protobuf/igapi.proto similarity index 55% rename from grpc/protobuf/igapi.proto rename to grpcServer/protobuf/igapi.proto index ff47db3..eeff066 100644 --- a/grpc/protobuf/igapi.proto +++ b/grpcServer/protobuf/igapi.proto @@ -1,23 +1,25 @@ syntax = "proto3"; service IGAPI { - rpc login () returns (Reply) {} + rpc login (Request) returns (Reply) {} - rpc account_info () returns (Reply) {} + rpc account_info (Request) returns (Reply) {} rpc upload (Request) returns (Reply) {} rpc delete (Request) returns (Reply) {} rpc setting (Request) returns (Reply) {} + + rpc queue (Request) returns (Reply) {} } message Request { - int code = 1; - repeated string args = 2; + int64 code = 1; + repeated string args = 2; } message Reply { - int err = 1; + int64 err = 1; map result = 2; } \ No newline at end of file diff --git a/grpcServer/protobuf/igapi_pb2.py b/grpcServer/protobuf/igapi_pb2.py new file mode 100644 index 0000000..9c290bc --- /dev/null +++ b/grpcServer/protobuf/igapi_pb2.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: igapi.proto +# Protobuf Python Version: 5.28.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 28, + 1, + '', + 'igapi.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bigapi.proto\"%\n\x07Request\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x03\x12\x0c\n\x04\x61rgs\x18\x02 \x03(\t\"g\n\x05Reply\x12\x0b\n\x03\x65rr\x18\x01 \x01(\x03\x12\"\n\x06result\x18\x02 \x03(\x0b\x32\x12.Reply.ResultEntry\x1a-\n\x0bResultEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x32\xc0\x01\n\x05IGAPI\x12\x1b\n\x05login\x12\x08.Request\x1a\x06.Reply\"\x00\x12\"\n\x0c\x61\x63\x63ount_info\x12\x08.Request\x1a\x06.Reply\"\x00\x12\x1c\n\x06upload\x12\x08.Request\x1a\x06.Reply\"\x00\x12\x1c\n\x06\x64\x65lete\x12\x08.Request\x1a\x06.Reply\"\x00\x12\x1d\n\x07setting\x12\x08.Request\x1a\x06.Reply\"\x00\x12\x1b\n\x05queue\x12\x08.Request\x1a\x06.Reply\"\x00\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'igapi_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_REPLY_RESULTENTRY']._loaded_options = None + _globals['_REPLY_RESULTENTRY']._serialized_options = b'8\001' + _globals['_REQUEST']._serialized_start=15 + _globals['_REQUEST']._serialized_end=52 + _globals['_REPLY']._serialized_start=54 + _globals['_REPLY']._serialized_end=157 + _globals['_REPLY_RESULTENTRY']._serialized_start=112 + _globals['_REPLY_RESULTENTRY']._serialized_end=157 + _globals['_IGAPI']._serialized_start=160 + _globals['_IGAPI']._serialized_end=352 +# @@protoc_insertion_point(module_scope) diff --git a/grpcServer/protobuf/igapi_pb2_grpc.py b/grpcServer/protobuf/igapi_pb2_grpc.py new file mode 100644 index 0000000..00ad5ee --- /dev/null +++ b/grpcServer/protobuf/igapi_pb2_grpc.py @@ -0,0 +1,312 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from grpcServer.protobuf import igapi_pb2 as igapi__pb2 + +GRPC_GENERATED_VERSION = '1.68.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in igapi_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class IGAPIStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.login = channel.unary_unary( + '/IGAPI/login', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + self.account_info = channel.unary_unary( + '/IGAPI/account_info', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + self.upload = channel.unary_unary( + '/IGAPI/upload', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + self.delete = channel.unary_unary( + '/IGAPI/delete', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + self.setting = channel.unary_unary( + '/IGAPI/setting', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + self.queue = channel.unary_unary( + '/IGAPI/queue', + request_serializer=igapi__pb2.Request.SerializeToString, + response_deserializer=igapi__pb2.Reply.FromString, + _registered_method=True) + + +class IGAPIServicer(object): + """Missing associated documentation comment in .proto file.""" + + def login(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def account_info(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def upload(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def delete(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def setting(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def queue(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_IGAPIServicer_to_server(servicer, server): + rpc_method_handlers = { + 'login': grpc.unary_unary_rpc_method_handler( + servicer.login, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + 'account_info': grpc.unary_unary_rpc_method_handler( + servicer.account_info, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + 'upload': grpc.unary_unary_rpc_method_handler( + servicer.upload, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + 'delete': grpc.unary_unary_rpc_method_handler( + servicer.delete, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + 'setting': grpc.unary_unary_rpc_method_handler( + servicer.setting, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + 'queue': grpc.unary_unary_rpc_method_handler( + servicer.queue, + request_deserializer=igapi__pb2.Request.FromString, + response_serializer=igapi__pb2.Reply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'IGAPI', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('IGAPI', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class IGAPI(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def login(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/login', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def account_info(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/account_info', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def upload(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/upload', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def delete(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/delete', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def setting(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/setting', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def queue(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/IGAPI/queue', + igapi__pb2.Request.SerializeToString, + igapi__pb2.Reply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/ig/IG.py b/ig/IG.py index bdc32af..fc5b9e1 100644 --- a/ig/IG.py +++ b/ig/IG.py @@ -3,14 +3,19 @@ from typing import List from instagrapi import Client -from utils import shareclass from utils.tbProcessor import easyExceptionHandler from utils.const import DEVICE +cl:Client = None + +# init +def init(askcl:Client) -> None: + global cl + cl = askcl + + # login def login() -> int: - cl:Client = shareclass.Shared.ig_get_client() - # Env ACCOUNT_USERNAME = os.getenv("ACCOUNT_USERNAME", None).strip() ACCOUNT_PASSWORD = os.getenv("ACCOUNT_PASSWORD", None).strip() @@ -58,8 +63,7 @@ def login() -> int: # Get account info def account_info() -> dict | None: - cl:Client = shareclass.Shared.ig_get_client() - + print("[*] IG: Fetching account info") try: info = cl.account_info().dict() return info @@ -70,8 +74,6 @@ def account_info() -> dict | None: # Get media info def media_info(code:str) -> dict | None: - cl:Client = shareclass.Shared.ig_get_client() - try: pk = cl.media_pk_from_code(code) info = cl.media_info(pk).dict() @@ -82,16 +84,14 @@ def media_info(code:str) -> dict | None: # Upload media -def upload_media(ctx:str, paths:List[str]) -> dict | None: - cl:Client = shareclass.Shared.ig_get_client() - +def upload_media(content:str, paths:List[str]) -> dict | None: try: # uplaod if len(paths) == 0: return None elif len(paths) == 1: - media = cl.photo_upload(path=paths[0], caption=ctx).dict() + media = cl.photo_upload(path=paths[0], caption=content).dict() else: - media = cl.photo_upload(path=paths[0], caption=ctx).dict() + media = cl.photo_upload(path=paths[0], caption=content).dict() return media except Exception as e: @@ -101,8 +101,6 @@ def upload_media(ctx:str, paths:List[str]) -> dict | None: # Delete Media def delete_media(code:str) -> int: - cl:Client = shareclass.Shared.ig_get_client() - try: media_pk = str(cl.media_pk_from_code(code)) media_id = cl.media_id(media_pk) diff --git a/ig/ctxPictuterProma.py b/ig/contentPictuterProma.py similarity index 80% rename from ig/ctxPictuterProma.py rename to ig/contentPictuterProma.py index 1b419ab..1fbb895 100644 --- a/ig/ctxPictuterProma.py +++ b/ig/contentPictuterProma.py @@ -6,21 +6,23 @@ from PIL import Image, ImageDraw, ImageFont from utils.const import PROMA_HEIGHT, PROMA_WIDTH, PROMA_FONT, PROMA_FONTSIZE, TMP_DIR -def new_proma(ctx:str): +def new_proma(content:str): + # 靠 版型在哪 img = Image.new(mode="RGB", size=(PROMA_WIDTH, PROMA_HEIGHT), - color=(255, 255, 255)) # 靠 沒版型阿 + color=(255, 255, 255)) font = ImageFont.truetype(PROMA_FONT, PROMA_FONTSIZE, encoding='utf-8') draw:ImageDraw.ImageDraw = ImageDraw.Draw(img) draw.text(xy=(0, 0), - text=ctx, + text=content, font=font, fill=(0, 0, 0)) + # 存檔 filename = TMP_DIR + hashlib.sha512( str(time.time()).encode() ).hexdigest() + ".jpg" img.save(filename) filename = os.path.abspath(filename) - return filename \ No newline at end of file + return filename diff --git a/requirements.txt b/requirements.txt index 351c394..20d495d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,11 @@ instagrapi sqlalchemy +sqlalchemy_utils protobuf==5.28.3 Pillow pillow-heif asyncio -psycopg2 \ No newline at end of file +psycopg2 +grpcio +minio +cachetools \ No newline at end of file diff --git a/s3/s3helper.py b/s3/s3helper.py new file mode 100644 index 0000000..18c084c --- /dev/null +++ b/s3/s3helper.py @@ -0,0 +1,38 @@ +from typing import Tuple +import os +import sys + +import minio + +S3_BUCKET:str = os.getenv("S3_BUCKET") + +s3 = minio.Minio(endpoint=os.getenv("S3_ENDPOINT").strip(), + access_key=os.getenv("S3_ACCESS_KEY").strip(), + secret_key=os.getenv("S3_SECRET_KEY").strip(), + secure=False) + +# check exist +print("[*] Connecting to Minio") +if not s3.bucket_exists(S3_BUCKET): + print("[X] Where is S3 bucket \"%s\"?"%S3_BUCKET) + sys.exit(0) + + +# methods +def solo_file_fetcher(fnhash:str) -> Tuple[dict | None, int]: + fnd = None + err = 1 + try: + # print(fnhash) + res = s3.get_object(S3_BUCKET, fnhash) + mime = res.getheader("Content-Type") + fnd = res.data + + err = 0 + fnd = {"binary":fnd, "mime":mime} + except: + fnd, err = None, 1 + + res.close() + res.release_conn() + return fnd, err diff --git a/utils/ThreadSafeOrderedDict.py b/utils/ThreadSafeOrderedDict.py new file mode 100644 index 0000000..e611572 --- /dev/null +++ b/utils/ThreadSafeOrderedDict.py @@ -0,0 +1,47 @@ +from collections import OrderedDict +from threading import RLock + +class ThreadSafeOrderedDict: + def __init__(self): + self.lock = RLock() + self.data = OrderedDict() + + def __setitem__(self, key, value): + with self.lock: + self.data[key] = value + + def __getitem__(self, key): + with self.lock: + if key in self.data: + return self.data[key] + return None + + def remove(self, key): + with self.lock: + if key in self.data: + del self.data[key] + + def move_to_end(self, key, last=True): + with self.lock: + if key in self.data: + self.data.move_to_end(key, last=last) + + def pop(self, key): + with self.lock: + if key in self.data: + return self.data.pop(key) + return None + + def popitem(self, last:bool=True): + with self.lock: + if len(self.data): + return self.data.popitem(last) + return None + + def items(self): + with self.lock: + return self.data.items() + + def __repr__(self): + with self.lock: + return repr(self.data) diff --git a/utils/const.py b/utils/const.py index f4b4cac..67834da 100644 --- a/utils/const.py +++ b/utils/const.py @@ -30,7 +30,22 @@ FILE_MINE_TYPE = { TMP_DIR = "./tmp/" # content picture +# tmp solve PROMA_WIDTH = 600 PROMA_HEIGHT = 600 +# i have no template +PROMA_PATH = "" +# done +PROMA_FONTSIZE = 40 PROMA_FONT = "./resource/OpenSans-Regular.ttf" -PROMA_FONTSIZE = 40 \ No newline at end of file + +# gRRC IG cache time +GRPC_ACCINFO_CACHE = 5*60 +GRPC_RELOGIN_LIMIT = 10*60 + +# gRPC upload/delete interval (for anoth.py) +ANOTH_INTERVAL_MIN = 2*60 +ANOTH_INTERVAL_MAX = 5*60 +# for testing +# ANOTH_INTERVAL_MIN = 10 +# ANOTH_INTERVAL_MAX = 30 diff --git a/utils/fileProcessor.py b/utils/fileProcessor.py index 24f27c7..dbb5257 100644 --- a/utils/fileProcessor.py +++ b/utils/fileProcessor.py @@ -104,4 +104,4 @@ def file_saver(ftype:str, binary:bytes) -> Tuple[str, int]: else: # 如果是 IG 本身支援的檔案 -> 存檔 opt = os.path.abspath(os.path.join(TMP_DIR, filename+"."+ext)) file_writer(opt, binary) - return opt, 0 \ No newline at end of file + return opt, 0 diff --git a/utils/shareclass.py b/utils/shareclass.py deleted file mode 100644 index f5640d9..0000000 --- a/utils/shareclass.py +++ /dev/null @@ -1,19 +0,0 @@ -from sqlalchemy.orm import sessionmaker - -class Shared: - _client = None # For instagram - _engine = None # For engine - - @classmethod - def __init__(cls, cl, eng): - cls._client = cl - cls._engine = eng - - @classmethod - def ig_get_client(cls): - return cls._client - - @classmethod - def db_get_session(cls): - Session = sessionmaker(bind=cls._engine) - return Session() \ No newline at end of file