niming_igapi/backend/processor.py
2025-04-26 22:36:57 +00:00

93 lines
2.5 KiB
Python

from typing import Tuple
import os
import io
import magic
from config.config import DEBUG
from backend.ig import IG
from backend.db import dbhelper
from backend.utils import ld_interface
from backend.utils import ld_picturemaker
from backend.utils import fileProcessor
def clean(file_list):
for f in file_list:
try: os.remove(f)
except: pass
# return (errmsg | code, errcode)
def upload(aid:int) -> Tuple[str, int]:
# get article
article = ld_interface.inf.get(index=aid, media=True)
if article is None:
return "Post not found", 1
# multimedia -> tmp file
tmp_path = []
for m in article["content"]["media"]:
# check mime type
mime = magic.Magic(mime=True)
tp = mime.from_buffer(m.read())
# save file
filename, err = fileProcessor.file_saver(tp, m.read())
if err:
clean(tmp_path)
return "Error while saving file", 1
tmp_path.append(filename)
article["content"]["media"] = []
"""
# 抓取檔案
files = []
for k in article["files_hash"]:
f, code = s3helper.solo_file_fetcher(fnhash=k)
if code:
return "File not found", 1
else:
files.append(f)
# 轉出暫存檔案
tmp_path:list = []
for t in files:
filename, err = fileProcessor.file_saver(t.get("mime"), t.get("binary"))
if err: # 如果錯誤
return filename, 1
tmp_path.append(filename)
"""
# 合成文字圖
proma_file = ld_picturemaker.picture_maker.gen(article)
tmp_path = [proma_file] + tmp_path
# 送交 IG 上傳
if not DEBUG:
media = IG.upload_media(article, tmp_path)
if media is None:
return "Upload failed", 1
else:
media = {"code":"fake_data"}
# 刪除檔案
clean(tmp_path)
return media["code"], 0
# return (errmsg, code)
#def remove(code:str) -> Tuple[str, int]:
def remove(aid:int) -> Tuple[str, int]:
# 抓取文章本體 - 叫你刪除的時候可能已經找不到本體了
# article, code = dbhelper.solo_article_fetcher(role="general", key=aid)
# if code != 200:
# return "Post not found", 1
article = dbhelper.solo_article_fetcher(aid=aid) # 從對表資料庫裡面抓igid
if article is None:
return "Post not found", 1
err = IG.delete_media(article["igid"])
if err:
return "Remove failed", 1
return "OK", 0