1st commit
This commit is contained in:
commit
0c7fc73c40
10
.gitignore
vendored
Normal file
10
.gitignore
vendored
Normal file
@ -0,0 +1,10 @@
|
||||
.env
|
||||
__pycache__
|
||||
session.json
|
||||
traceback.json
|
||||
test.py
|
||||
|
||||
tmp
|
||||
tmp/*
|
||||
testFiles
|
||||
testFiles/*
|
42
app.py
Normal file
42
app.py
Normal file
@ -0,0 +1,42 @@
|
||||
import os, sys
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from instagrapi import Client
|
||||
# from dotenv import load_dotenv
|
||||
|
||||
from utils import shareclass
|
||||
from ig import IG
|
||||
from db import pgclass
|
||||
from grpc import grpcServer
|
||||
from utils.const import DEBUG
|
||||
|
||||
# load_dotenv()
|
||||
|
||||
# Database
|
||||
PG_HOST = os.environ.get("PG_HOST", None).strip()
|
||||
PG_PORT = os.environ.get("PG_PORT", None).strip()
|
||||
PG_NAME = os.environ.get("PG_NAME", None).strip()
|
||||
PG_USER = os.environ.get("PG_USER", None).strip()
|
||||
PG_PASS = os.environ.get("PG_PASS", None).strip()
|
||||
engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME))
|
||||
pgclass.Base.metadata.create_all(engine)
|
||||
print("[V] Database Connected")
|
||||
|
||||
# IG Login
|
||||
cl = Client()
|
||||
shareclass.Shared(cl, engine) # Shared Class
|
||||
if not DEBUG and not IG.login():
|
||||
sys.exit(0)
|
||||
|
||||
# grpc server should have...
|
||||
# - Get account info (a kind of checkalive)
|
||||
# - Get media info (a kind of checkalive)
|
||||
# - Upload media (預設客戶端給我id)
|
||||
# - Delete media
|
||||
# - Login
|
||||
|
||||
# IG統一保存code
|
||||
|
||||
# run grpc
|
||||
if __name__ == "__main__":
|
||||
grpcServer.serve()
|
61
db/DBHelper.py
Normal file
61
db/DBHelper.py
Normal file
@ -0,0 +1,61 @@
|
||||
from typing import Tuple, Dict
|
||||
|
||||
from db import pgclass
|
||||
from utils import shareclass
|
||||
|
||||
# 獲取單一文章
|
||||
def solo_article_fetcher(key:int) -> Dict | None:
|
||||
table = pgclass.SQLarticle
|
||||
ftab = pgclass.SQLfile
|
||||
resfn = {}
|
||||
|
||||
with shareclass.Shared.db_get_session() as session:
|
||||
# query
|
||||
res = session.query(table).filter(table.id == key, table.mark == "visible").first()
|
||||
if res is None: return None
|
||||
|
||||
# mapping
|
||||
resfn.update({"id": res.id,
|
||||
"ctx": res.ctx,
|
||||
"igid": res.igid,
|
||||
"mark": res.mark,
|
||||
"reference": res.reference,
|
||||
"hash": res.hash,
|
||||
"created_at": res.created_at,
|
||||
"ip": res.ip
|
||||
})
|
||||
|
||||
# file
|
||||
resfn["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == res.hash).all() ]
|
||||
|
||||
return resfn
|
||||
|
||||
# 獲取檔案
|
||||
def solo_file_fetcher(id:int) -> Dict | None:
|
||||
table = pgclass.SQLarticle
|
||||
ftab = pgclass.SQLfile
|
||||
|
||||
with shareclass.Shared.db_get_session() as session:
|
||||
fres = session.query(ftab).filter(ftab.id == id).first()
|
||||
if fres is None: # 檢查檔案是否存在
|
||||
return None
|
||||
|
||||
article = session.query(table).filter(table.hash == fres.reference, table.mark == 'visible').first()
|
||||
if article is None: # 檢查文章本體是否存在/可以閱覽
|
||||
return None
|
||||
|
||||
# mapping
|
||||
resfn = {
|
||||
"type": fres.type,
|
||||
"binary": fres.binary
|
||||
}
|
||||
|
||||
return resfn
|
||||
|
||||
# 寫入IG狀態
|
||||
def solo_article_updater(id:int, code:str):
|
||||
table = pgclass.SQLarticle
|
||||
with shareclass.Shared.db_get_session() as session:
|
||||
res = session.query(table).filter(table.id == id).first()
|
||||
res.igid = code
|
||||
session.commit()
|
53
db/pgclass.py
Normal file
53
db/pgclass.py
Normal file
@ -0,0 +1,53 @@
|
||||
from sqlalchemy import Column, String, TIMESTAMP, func, BIGINT, LargeBinary, ARRAY
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class SQLarticle(Base):
|
||||
__tablename__ = 'posts'
|
||||
|
||||
id = Column(BIGINT, primary_key=True)
|
||||
created_at = Column(TIMESTAMP(timezone=True), server_default=func.now())
|
||||
hash = Column(String)
|
||||
ctx = Column(String)
|
||||
igid = Column(String)
|
||||
mark = Column(String)
|
||||
ip = Column(String)
|
||||
reference = Column(BIGINT)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<article(id={self.id}, hash={self.hash}, ctx={self.ctx}, igid={self.igid}, mark={self.mark}, created_at={self.created_at}, ip={self.ip}, reference={self.reference})>"
|
||||
|
||||
class SQLlog(Base):
|
||||
__tablename__ = 'logs'
|
||||
|
||||
id = Column(BIGINT, primary_key=True)
|
||||
created_at = Column(TIMESTAMP(timezone=True), server_default=func.now())
|
||||
message = Column(String)
|
||||
source = Column(String)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<log(id={self.id}, created_at={self.created_at}, message={self.message}, source={self.source})>"
|
||||
|
||||
class SQLfile(Base):
|
||||
__tablename__ = 'files'
|
||||
|
||||
id = Column(BIGINT, primary_key=True)
|
||||
created_at = Column(TIMESTAMP(timezone=True), server_default=func.now())
|
||||
type = Column(String)
|
||||
reference = Column(String)
|
||||
binary = Column(LargeBinary)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<file(id={self.id}, created_at={self.created_at}, type={self.type}, binary={self.binary}, reference={self.reference})>"
|
||||
|
||||
class SQLuser(Base):
|
||||
__tablename__ = 'users'
|
||||
|
||||
id = Column(BIGINT, primary_key=True)
|
||||
user = Column(String)
|
||||
password = Column(String) # hash , sha512
|
||||
permission = Column(ARRAY(String))
|
||||
|
||||
def __repr__(self):
|
||||
return f"<user(id={self.id}, user={self.user}, password={self.password}, permission={self.permission})>"
|
BIN
ffmpeg_python-0.2.0-py3-none-any.whl
Normal file
BIN
ffmpeg_python-0.2.0-py3-none-any.whl
Normal file
Binary file not shown.
88
grpc/grpcServer.py
Normal file
88
grpc/grpcServer.py
Normal file
@ -0,0 +1,88 @@
|
||||
from typing import Tuple
|
||||
import os
|
||||
|
||||
from ig import ctxPictuterProma, IG
|
||||
from db import DBHelper
|
||||
from utils import fileProcessor
|
||||
from utils.const import DEBUG
|
||||
|
||||
# returns (errmsg | code, errcode)
|
||||
def upload(aid:int) -> Tuple[str, int]:
|
||||
# 抓取文章本體
|
||||
article = DBHelper.solo_article_fetcher(key = aid)
|
||||
if article is None:
|
||||
return "Post not found", 1
|
||||
|
||||
# 抓取檔案
|
||||
files = [
|
||||
DBHelper.solo_file_fetcher(id = k)
|
||||
for k in article["files"]
|
||||
]
|
||||
if None in files:
|
||||
return "File not found", 1
|
||||
|
||||
# 轉出暫存檔案
|
||||
tmp_path:list = []
|
||||
for t in files:
|
||||
filename, err = fileProcessor.file_saver(t.get("type"), t.get("binary"))
|
||||
if err: # 如果錯誤
|
||||
return filename, 1
|
||||
tmp_path.append(filename)
|
||||
|
||||
# 合成文字圖
|
||||
proma_file = ctxPictuterProma.new_proma(article["ctx"])
|
||||
tmp_path = [proma_file] + tmp_path
|
||||
|
||||
# 送交 IG 上傳
|
||||
if not DEBUG:
|
||||
media = IG.upload_media(article["ctx"], tmp_path)
|
||||
if media is None:
|
||||
return "Upload failed", 1
|
||||
else:
|
||||
media = {"code":"fake_data"}
|
||||
|
||||
# 刪除檔案
|
||||
for t in tmp_path:
|
||||
os.remove(t)
|
||||
|
||||
return media["code"], 0
|
||||
|
||||
|
||||
# return (errmsg, code)
|
||||
def remove(aid:int) -> Tuple[str, int]:
|
||||
# 抓取文章本體
|
||||
article = DBHelper.solo_article_fetcher(key = aid)
|
||||
if article is None:
|
||||
return "Post not found", 1
|
||||
|
||||
err = IG.delete_media(article["igid"])
|
||||
if err:
|
||||
return "Remove failed", 1
|
||||
|
||||
return "OK", 0
|
||||
|
||||
|
||||
def serve():
|
||||
pass
|
||||
|
||||
def _serve():
|
||||
print(IG.account_info())
|
||||
|
||||
pass
|
||||
|
||||
return
|
||||
aid = 57
|
||||
|
||||
msg, err = upload(aid)
|
||||
if err:
|
||||
print(msg)
|
||||
return
|
||||
|
||||
input("Press any key...")
|
||||
|
||||
DBHelper.solo_article_updater(id=aid, code=msg)
|
||||
|
||||
msg, err = remove(aid)
|
||||
if err:
|
||||
print(msg)
|
||||
return
|
23
grpc/protobuf/igapi.proto
Normal file
23
grpc/protobuf/igapi.proto
Normal file
@ -0,0 +1,23 @@
|
||||
syntax = "proto3";
|
||||
|
||||
service IGAPI {
|
||||
rpc login () returns (Reply) {}
|
||||
|
||||
rpc account_info () returns (Reply) {}
|
||||
|
||||
rpc upload (Request) returns (Reply) {}
|
||||
|
||||
rpc delete (Request) returns (Reply) {}
|
||||
|
||||
rpc setting (Request) returns (Reply) {}
|
||||
}
|
||||
|
||||
message Request {
|
||||
int code = 1;
|
||||
repeated string args = 2;
|
||||
}
|
||||
|
||||
message Reply {
|
||||
int err = 1;
|
||||
map<string, string> result = 2;
|
||||
}
|
3
grpc/protobuf/note.txt
Normal file
3
grpc/protobuf/note.txt
Normal file
@ -0,0 +1,3 @@
|
||||
Response:
|
||||
code: int
|
||||
message: str
|
113
ig/IG.py
Normal file
113
ig/IG.py
Normal file
@ -0,0 +1,113 @@
|
||||
import os
|
||||
from typing import List
|
||||
|
||||
from instagrapi import Client
|
||||
|
||||
from utils import shareclass
|
||||
from utils.tbProcessor import easyExceptionHandler
|
||||
from utils.const import DEVICE
|
||||
|
||||
# login
|
||||
def login() -> int:
|
||||
cl:Client = shareclass.Shared.ig_get_client()
|
||||
|
||||
# Env
|
||||
ACCOUNT_USERNAME = os.getenv("ACCOUNT_USERNAME", None).strip()
|
||||
ACCOUNT_PASSWORD = os.getenv("ACCOUNT_PASSWORD", None).strip()
|
||||
|
||||
# session
|
||||
session = None
|
||||
if os.path.exists("./session.json"):
|
||||
session = cl.load_settings("session.json")
|
||||
|
||||
cl.delay_range = [2, 5]
|
||||
# login with sessionid
|
||||
cl.set_device(DEVICE)
|
||||
sessionSuccess = True
|
||||
if session:
|
||||
print("[*] Trying logging in with session")
|
||||
try:
|
||||
cl.set_settings(session)
|
||||
cl.login(ACCOUNT_USERNAME, ACCOUNT_PASSWORD)
|
||||
cl.get_timeline_feed()
|
||||
except:
|
||||
sessionSuccess = False
|
||||
else:
|
||||
sessionSuccess = False
|
||||
|
||||
# login with username and password
|
||||
if not sessionSuccess:
|
||||
print("[*] Trying logging in with username and password")
|
||||
try:
|
||||
old_session = cl.get_settings()
|
||||
cl.set_settings({})
|
||||
cl.set_uuids(old_session["uuids"])
|
||||
cl.login(ACCOUNT_USERNAME, ACCOUNT_PASSWORD)
|
||||
cl.get_timeline_feed()
|
||||
except:
|
||||
print("[X] Cannot log in")
|
||||
return 0
|
||||
|
||||
cl.dump_settings("session.json")
|
||||
|
||||
# return
|
||||
username = cl.account_info().dict()["username"]
|
||||
print("[V] Logged as %s"%username)
|
||||
return 1
|
||||
|
||||
|
||||
# Get account info
|
||||
def account_info() -> dict | None:
|
||||
cl:Client = shareclass.Shared.ig_get_client()
|
||||
|
||||
try:
|
||||
info = cl.account_info().dict()
|
||||
return info
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return None
|
||||
|
||||
|
||||
# Get media info
|
||||
def media_info(code:str) -> dict | None:
|
||||
cl:Client = shareclass.Shared.ig_get_client()
|
||||
|
||||
try:
|
||||
pk = cl.media_pk_from_code(code)
|
||||
info = cl.media_info(pk).dict()
|
||||
return info
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return None
|
||||
|
||||
|
||||
# Upload media
|
||||
def upload_media(ctx:str, paths:List[str]) -> dict | None:
|
||||
cl:Client = shareclass.Shared.ig_get_client()
|
||||
|
||||
try:
|
||||
# uplaod
|
||||
if len(paths) == 0: return None
|
||||
elif len(paths) == 1:
|
||||
media = cl.photo_upload(path=paths[0], caption=ctx).dict()
|
||||
else:
|
||||
media = cl.photo_upload(path=paths[0], caption=ctx).dict()
|
||||
|
||||
return media
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return None
|
||||
|
||||
|
||||
# Delete Media
|
||||
def delete_media(code:str) -> int:
|
||||
cl:Client = shareclass.Shared.ig_get_client()
|
||||
|
||||
try:
|
||||
media_pk = str(cl.media_pk_from_code(code))
|
||||
media_id = cl.media_id(media_pk)
|
||||
cl.media_delete(media_id)
|
||||
return 0
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return 1
|
26
ig/ctxPictuterProma.py
Normal file
26
ig/ctxPictuterProma.py
Normal file
@ -0,0 +1,26 @@
|
||||
import time
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
|
||||
from utils.const import PROMA_HEIGHT, PROMA_WIDTH, PROMA_FONT, PROMA_FONTSIZE, TMP_DIR
|
||||
|
||||
def new_proma(ctx:str):
|
||||
img = Image.new(mode="RGB",
|
||||
size=(PROMA_WIDTH, PROMA_HEIGHT),
|
||||
color=(255, 255, 255)) # 靠 沒版型阿
|
||||
|
||||
font = ImageFont.truetype(PROMA_FONT, PROMA_FONTSIZE, encoding='utf-8')
|
||||
|
||||
draw:ImageDraw.ImageDraw = ImageDraw.Draw(img)
|
||||
draw.text(xy=(0, 0),
|
||||
text=ctx,
|
||||
font=font,
|
||||
fill=(0, 0, 0))
|
||||
|
||||
filename = TMP_DIR + hashlib.sha512( str(time.time()).encode() ).hexdigest() + ".jpg"
|
||||
img.save(filename)
|
||||
filename = os.path.abspath(filename)
|
||||
|
||||
return filename
|
7
requirements.txt
Normal file
7
requirements.txt
Normal file
@ -0,0 +1,7 @@
|
||||
instagrapi
|
||||
sqlalchemy
|
||||
protobuf==5.28.3
|
||||
Pillow
|
||||
pillow-heif
|
||||
asyncio
|
||||
psycopg2
|
BIN
resource/OpenSans-Regular.ttf
Normal file
BIN
resource/OpenSans-Regular.ttf
Normal file
Binary file not shown.
1
settings.json
Normal file
1
settings.json
Normal file
@ -0,0 +1 @@
|
||||
{}
|
36
utils/const.py
Normal file
36
utils/const.py
Normal file
@ -0,0 +1,36 @@
|
||||
DEBUG = False
|
||||
|
||||
# fake device
|
||||
DEVICE = {
|
||||
"app_version": "269.0.0.18.75",
|
||||
"android_version": 26,
|
||||
"android_release": "8.0.0",
|
||||
"dpi": "480dpi",
|
||||
"resolution": "1080x1920",
|
||||
"manufacturer": "OnePlus",
|
||||
"device": "Asus",
|
||||
"model": "K00G",
|
||||
"cpu": "qcom",
|
||||
"version_code": "314665256",
|
||||
}
|
||||
|
||||
# type define {mine:ext}
|
||||
FILE_MINE_TYPE = {
|
||||
"image/jpeg": "jpg",
|
||||
"image/pjpeg": "jfif",
|
||||
"image/png": "png",
|
||||
"image/heic": "heic",
|
||||
"image/heif": "heif",
|
||||
"image/webp": "webp",
|
||||
"video/mp4": "mp4",
|
||||
"video/quicktime": "mov",
|
||||
"video/hevc": "hevc",
|
||||
}
|
||||
|
||||
TMP_DIR = "./tmp/"
|
||||
|
||||
# content picture
|
||||
PROMA_WIDTH = 600
|
||||
PROMA_HEIGHT = 600
|
||||
PROMA_FONT = "./resource/OpenSans-Regular.ttf"
|
||||
PROMA_FONTSIZE = 40
|
107
utils/fileProcessor.py
Normal file
107
utils/fileProcessor.py
Normal file
@ -0,0 +1,107 @@
|
||||
import time
|
||||
import os
|
||||
import io
|
||||
from typing import Tuple
|
||||
import subprocess
|
||||
|
||||
from hashlib import sha512
|
||||
from PIL import Image
|
||||
from pillow_heif import register_heif_opener
|
||||
import ffmpeg
|
||||
|
||||
from utils.const import FILE_MINE_TYPE, TMP_DIR
|
||||
from utils.tbProcessor import easyExceptionHandler
|
||||
|
||||
register_heif_opener()
|
||||
|
||||
def image_conventer(filename:str, binary: bytes) -> int:
|
||||
try:
|
||||
fio = io.BytesIO(binary)
|
||||
img:Image.Image = Image.open(fio)
|
||||
img = img.convert("RGB")
|
||||
img.save(filename, "JPEG", quality=95)
|
||||
return 0
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return 1
|
||||
|
||||
|
||||
def read_output(pipe, q):
|
||||
""" 用於非阻塞讀取 ffmpeg 的 stdout """
|
||||
while True:
|
||||
data = pipe.read(4096)
|
||||
if not data:
|
||||
break
|
||||
q.put(data)
|
||||
q.put(None) # 標記輸出結束
|
||||
|
||||
|
||||
def video_conventor(filename:str, oriFormat:str, binary:bytes) -> int:
|
||||
try:
|
||||
tmpfile = filename+"_tmp"
|
||||
# write to tempfile
|
||||
with open(tmpfile, "wb") as f:
|
||||
f.write(binary)
|
||||
|
||||
# ffmpeg process
|
||||
process:subprocess.Popen = (
|
||||
ffmpeg
|
||||
.input(tmpfile, format=oriFormat)
|
||||
.output(filename, format='mp4')
|
||||
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
|
||||
)
|
||||
|
||||
process.wait()
|
||||
|
||||
# remove tempfile
|
||||
os.remove(tmpfile)
|
||||
|
||||
return 0
|
||||
except Exception as e:
|
||||
easyExceptionHandler(e)
|
||||
return 1
|
||||
|
||||
|
||||
def file_writer(filename:str, binary:bytes):
|
||||
with open(filename, "wb") as f:
|
||||
f.write(binary)
|
||||
|
||||
|
||||
def file_saver(ftype:str, binary:bytes) -> Tuple[str, int]:
|
||||
"""
|
||||
ftype -> minetype
|
||||
binary -> file binary
|
||||
"""
|
||||
|
||||
# 獲取副檔名
|
||||
ext = None
|
||||
for t in FILE_MINE_TYPE:
|
||||
if t == ftype:
|
||||
ext = FILE_MINE_TYPE[t]
|
||||
if ext is None:
|
||||
return "Invalid file type", 1
|
||||
|
||||
# 如果不是 IG 本身支援的檔案 -> 轉檔
|
||||
filename = sha512( str(time.time()).encode() ).hexdigest() # 暫存檔名稱
|
||||
opt = "" # output file name
|
||||
if not ( ftype == "image/jpg" or ftype == "image/webp" or \
|
||||
ftype == "video/mp4" ):
|
||||
# 轉圖片
|
||||
if ftype.startswith("image"):
|
||||
opt = os.path.abspath(os.path.join(TMP_DIR, filename+".jpg"))
|
||||
err = image_conventer(opt, binary)
|
||||
if err: # 發生錯誤
|
||||
return "File convert error", 1
|
||||
# 轉影片
|
||||
elif ftype.startswith("video"):
|
||||
opt = os.path.abspath(os.path.join(TMP_DIR, filename+".mp4"))
|
||||
err = video_conventor(opt, ext, binary)
|
||||
if err:
|
||||
return "File convert error", 1
|
||||
|
||||
# 轉檔完成
|
||||
return opt, 0
|
||||
else: # 如果是 IG 本身支援的檔案 -> 存檔
|
||||
opt = os.path.abspath(os.path.join(TMP_DIR, filename+"."+ext))
|
||||
file_writer(opt, binary)
|
||||
return opt, 0
|
16
utils/settingLoader.py
Normal file
16
utils/settingLoader.py
Normal file
@ -0,0 +1,16 @@
|
||||
import json
|
||||
|
||||
def loadset():
|
||||
with open("./settings.json", "r", encoding = "utf-8") as f:
|
||||
d = json.load(f)
|
||||
return d
|
||||
|
||||
def writeset(name:str, value):
|
||||
# 寫入
|
||||
d:dict = loadset()
|
||||
d[name] = value
|
||||
|
||||
with open("./settings.json", "w", encoding = "utf-8") as f:
|
||||
json.dump(d, f, ensure_ascii=False)
|
||||
|
||||
return d
|
19
utils/shareclass.py
Normal file
19
utils/shareclass.py
Normal file
@ -0,0 +1,19 @@
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
class Shared:
|
||||
_client = None # For instagram
|
||||
_engine = None # For engine
|
||||
|
||||
@classmethod
|
||||
def __init__(cls, cl, eng):
|
||||
cls._client = cl
|
||||
cls._engine = eng
|
||||
|
||||
@classmethod
|
||||
def ig_get_client(cls):
|
||||
return cls._client
|
||||
|
||||
@classmethod
|
||||
def db_get_session(cls):
|
||||
Session = sessionmaker(bind=cls._engine)
|
||||
return Session()
|
51
utils/tbProcessor.py
Normal file
51
utils/tbProcessor.py
Normal file
@ -0,0 +1,51 @@
|
||||
import json
|
||||
import traceback
|
||||
import os
|
||||
|
||||
FILENAME = "./traceback.json"
|
||||
|
||||
def prechecker():
|
||||
if not os.path.exists(FILENAME):
|
||||
with open(FILENAME, "w", encoding = "utf-8") as f:
|
||||
json.dump({"err":{}, "id":0}, f, ensure_ascii=False)
|
||||
|
||||
def load():
|
||||
prechecker()
|
||||
with open(FILENAME, "r", encoding = "utf-8") as f:
|
||||
d = json.load(f)
|
||||
return d
|
||||
|
||||
def debug_info_from_exception(exc) -> dict:
|
||||
exc_type = type(exc).__name__
|
||||
exc_message = str(exc)
|
||||
exc_traceback = traceback.format_exception(type(exc), exc, exc.__traceback__)
|
||||
|
||||
debug_info = {
|
||||
"Exception_type": str(exc_type),
|
||||
"Exception_message": str(exc_message),
|
||||
"Trackback": str(exc_traceback)
|
||||
}
|
||||
|
||||
# debug
|
||||
for s in exc_traceback: print(s)
|
||||
|
||||
return debug_info
|
||||
|
||||
def write(e:Exception):
|
||||
d:dict = load()
|
||||
|
||||
eid = d["id"]
|
||||
debug_info = debug_info_from_exception(e)
|
||||
d["err"][str(eid)] = debug_info
|
||||
d["id"] += 1
|
||||
|
||||
with open(FILENAME, "w", encoding = "utf-8") as f:
|
||||
json.dump(d, f, ensure_ascii=False)
|
||||
|
||||
return eid
|
||||
|
||||
def easyExceptionHandler(e:Exception):
|
||||
exc_type = type(e).__name__
|
||||
exc_message = str(e)
|
||||
exc_saved_id = write(e)
|
||||
print(f"[X] Exception id {exc_saved_id} : {exc_type} : {exc_message}")
|
Loading…
Reference in New Issue
Block a user