niming_igapi/utils/fileProcessor.py

107 lines
3.0 KiB
Python
Raw Normal View History

2024-11-22 02:14:01 +08:00
import time
import os
import io
from typing import Tuple
import subprocess
from hashlib import sha512
from PIL import Image
from pillow_heif import register_heif_opener
import ffmpeg
from utils.const import FILE_MINE_TYPE, TMP_DIR
from utils.tbProcessor import easyExceptionHandler
register_heif_opener()
def image_conventer(filename:str, binary: bytes) -> int:
try:
fio = io.BytesIO(binary)
img:Image.Image = Image.open(fio)
img = img.convert("RGB")
img.save(filename, "JPEG", quality=95)
return 0
except Exception as e:
easyExceptionHandler(e)
return 1
def read_output(pipe, q):
""" 用於非阻塞讀取 ffmpeg 的 stdout """
while True:
data = pipe.read(4096)
if not data:
break
q.put(data)
q.put(None) # 標記輸出結束
def video_conventor(filename:str, oriFormat:str, binary:bytes) -> int:
try:
tmpfile = filename+"_tmp"
# write to tempfile
with open(tmpfile, "wb") as f:
f.write(binary)
# ffmpeg process
process:subprocess.Popen = (
ffmpeg
.input(tmpfile, format=oriFormat)
.output(filename, format='mp4')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
process.wait()
# remove tempfile
os.remove(tmpfile)
return 0
except Exception as e:
easyExceptionHandler(e)
return 1
def file_writer(filename:str, binary:bytes):
with open(filename, "wb") as f:
f.write(binary)
def file_saver(ftype:str, binary:bytes) -> Tuple[str, int]:
"""
ftype -> minetype
binary -> file binary
"""
# 獲取副檔名
ext = None
for t in FILE_MINE_TYPE:
if t == ftype:
ext = FILE_MINE_TYPE[t]
if ext is None:
return "Invalid file type", 1
# 如果不是 IG 本身支援的檔案 -> 轉檔
filename = sha512( str(time.time()).encode() ).hexdigest() # 暫存檔名稱
opt = "" # output file name
if not ( ftype == "image/jpg" or ftype == "image/webp" or \
ftype == "video/mp4" ):
# 轉圖片
if ftype.startswith("image"):
opt = os.path.abspath(os.path.join(TMP_DIR, filename+".jpg"))
err = image_conventer(opt, binary)
if err: # 發生錯誤
return "File convert error", 1
# 轉影片
elif ftype.startswith("video"):
opt = os.path.abspath(os.path.join(TMP_DIR, filename+".mp4"))
err = video_conventor(opt, ext, binary)
if err:
return "File convert error", 1
# 轉檔完成
return opt, 0
else: # 如果是 IG 本身支援的檔案 -> 存檔
opt = os.path.abspath(os.path.join(TMP_DIR, filename+"."+ext))
file_writer(opt, binary)
return opt, 0