niming_backend/utils/s3helper.py

67 lines
1.7 KiB
Python

from typing import Tuple, List
import os
import hashlib
import secrets
import time
import io
import sys
import minio
from minio.deleteobjects import DeleteObject
S3_BUCKET = os.getenv("S3_BUCKET")
s3 = minio.Minio(endpoint=os.getenv("S3_ENDPOINT"),
access_key=os.getenv("S3_ACCESS_KEY"),
secret_key=os.getenv("S3_SECRET_KEY"),
secure=False)
# check exist
if not s3.bucket_exists(S3_BUCKET):
print("Where is S3 bucket \"%s\"?"%S3_BUCKET)
sys.exit(0)
# methods
def multi_file_uploader(file_list, file_mines:List[str]) -> Tuple[List[str], int]:
midx = 0
fidlist = []
try:
for f in file_list:
seed = f + (str(time.time())+str(secrets.token_urlsafe(nbytes=16))).encode()
fnhash = hashlib.sha256(seed).hexdigest()
s3.put_object(bucket_name=S3_BUCKET,
object_name=fnhash,
data=io.BytesIO(f),
length=len(f),
content_type=file_mines[midx])
fidlist.append(fnhash)
midx += 1
return fidlist, 0
except Exception as e:
return [], 1
def solo_file_fetcher(fnhash:str) -> Tuple[dict | None, int]:
fnd = None
err = 1
try:
res = s3.get_object(S3_BUCKET, fnhash)
mime = res.getheader("Content-Type")
fnd = res.data
err = 0
fnd = {"binary":fnd, "mime":mime}
except:
fnd, err = None, 1
res.close()
res.release_conn()
return fnd, err
def multi_file_remover(file_list) -> int:
try:
s3.remove_objects(S3_BUCKET, [ DeleteObject(f) for f in file_list ])
return 0
except:
return 1