import time import os import io from typing import Tuple import subprocess from hashlib import sha512 from PIL import Image from pillow_heif import register_heif_opener import ffmpeg from utils.const import FILE_MINE_TYPE, TMP_DIR from utils.tbProcessor import easyExceptionHandler register_heif_opener() def image_conventer(filename:str, binary: bytes) -> int: try: fio = io.BytesIO(binary) img:Image.Image = Image.open(fio) img = img.convert("RGB") img.save(filename, "JPEG", quality=95) return 0 except Exception as e: easyExceptionHandler(e) return 1 def read_output(pipe, q): """ 用於非阻塞讀取 ffmpeg 的 stdout """ while True: data = pipe.read(4096) if not data: break q.put(data) q.put(None) # 標記輸出結束 def video_conventor(filename:str, oriFormat:str, binary:bytes) -> int: try: tmpfile = filename+"_tmp" # write to tempfile with open(tmpfile, "wb") as f: f.write(binary) # ffmpeg process process:subprocess.Popen = ( ffmpeg .input(tmpfile, format=oriFormat) .output(filename, format='mp4') .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) ) process.wait() # remove tempfile os.remove(tmpfile) return 0 except Exception as e: easyExceptionHandler(e) return 1 def file_writer(filename:str, binary:bytes): with open(filename, "wb") as f: f.write(binary) def file_saver(ftype:str, binary:bytes) -> Tuple[str, int]: """ ftype -> minetype binary -> file binary """ # 獲取副檔名 ext = None for t in FILE_MINE_TYPE: if t == ftype: ext = FILE_MINE_TYPE[t] if ext is None: return "Invalid file type", 1 # 如果不是 IG 本身支援的檔案 -> 轉檔 filename = sha512( str(time.time()).encode() ).hexdigest() # 暫存檔名稱 opt = "" # output file name if not ( ftype == "image/jpg" or ftype == "image/webp" or \ ftype == "video/mp4" ): # 轉圖片 if ftype.startswith("image"): opt = os.path.abspath(os.path.join(TMP_DIR, filename+".jpg")) err = image_conventer(opt, binary) if err: # 發生錯誤 return "File convert error", 1 # 轉影片 elif ftype.startswith("video"): opt = os.path.abspath(os.path.join(TMP_DIR, filename+".mp4")) err = video_conventor(opt, ext, binary) if err: return "File convert error", 1 # 轉檔完成 return opt, 0 else: # 如果是 IG 本身支援的檔案 -> 存檔 opt = os.path.abspath(os.path.join(TMP_DIR, filename+"."+ext)) file_writer(opt, binary) return opt, 0