from typing import Tuple, List import os import hashlib import secrets import time import io import sys import minio from minio.deleteobjects import DeleteObject S3_BUCKET:str = os.getenv("S3_BUCKET") s3 = minio.Minio(endpoint=os.getenv("S3_ENDPOINT"), access_key=os.getenv("S3_ACCESS_KEY"), secret_key=os.getenv("S3_SECRET_KEY"), secure=False) # check exist if not s3.bucket_exists(S3_BUCKET): print("[X] Where is S3 bucket \"%s\"?"%S3_BUCKET) sys.exit(0) # methods def multi_file_uploader(file_list, file_mines:List[str]) -> Tuple[List[str], int]: midx = 0 fidlist = [] try: for f in file_list: seed = f + (str(time.time())+str(secrets.token_urlsafe(nbytes=16))).encode() fnhash = hashlib.sha256(seed).hexdigest() s3.put_object(bucket_name=S3_BUCKET, object_name=fnhash, data=io.BytesIO(f), length=len(f), content_type=file_mines[midx]) fidlist.append(fnhash) midx += 1 return fidlist, 0 except Exception as e: return [], 1 def solo_file_fetcher(fnhash:str) -> Tuple[dict | None, int]: fnd = None err = 1 try: res = s3.get_object(S3_BUCKET, fnhash) mime = res.getheader("Content-Type") fnd = res.data err = 0 fnd = {"binary":fnd, "mime":mime} except: fnd, err = None, 1 res.close() res.release_conn() return fnd, err def multi_file_remover(file_list) -> int: try: if isinstance(file_list, list): for f in file_list: s3.remove_object(S3_BUCKET, f) return 0 except: return 1