niming_igapi/s3/s3helper.py

39 lines
897 B
Python
Raw Normal View History

2024-12-18 00:35:17 +08:00
from typing import Tuple
import os
import sys
import minio
S3_BUCKET:str = os.getenv("S3_BUCKET")
s3 = minio.Minio(endpoint=os.getenv("S3_ENDPOINT").strip(),
access_key=os.getenv("S3_ACCESS_KEY").strip(),
secret_key=os.getenv("S3_SECRET_KEY").strip(),
secure=False)
# check exist
print("[*] Connecting to Minio")
if not s3.bucket_exists(S3_BUCKET):
print("[X] Where is S3 bucket \"%s\"?"%S3_BUCKET)
sys.exit(0)
# methods
def solo_file_fetcher(fnhash:str) -> Tuple[dict | None, int]:
fnd = None
err = 1
try:
# print(fnhash)
res = s3.get_object(S3_BUCKET, fnhash)
mime = res.getheader("Content-Type")
fnd = res.data
err = 0
fnd = {"binary":fnd, "mime":mime}
except:
fnd, err = None, 1
res.close()
res.release_conn()
return fnd, err