niming_igapi/s3/s3helper.py
2024-12-17 19:06:40 +00:00

44 lines
996 B
Python

from typing import Tuple
import os
import sys
import logging
import minio
# logging
s3log = logging.getLogger("s3")
s3log.setLevel(level=logging.INFO)
S3_BUCKET:str = os.getenv("S3_BUCKET")
s3 = minio.Minio(endpoint=os.getenv("S3_ENDPOINT").strip(),
access_key=os.getenv("S3_ACCESS_KEY").strip(),
secret_key=os.getenv("S3_SECRET_KEY").strip(),
secure=False)
# check exist
s3log.info("Connecting to Minio")
if not s3.bucket_exists(S3_BUCKET):
s3log.critical("Where is S3 bucket \"%s\"?"%S3_BUCKET)
sys.exit(0)
# methods
def solo_file_fetcher(fnhash:str) -> Tuple[dict | None, int]:
fnd = None
err = 1
try:
# print(fnhash)
res = s3.get_object(S3_BUCKET, fnhash)
mime = res.getheader("Content-Type")
fnd = res.data
err = 0
fnd = {"binary":fnd, "mime":mime}
except:
fnd, err = None, 1
res.close()
res.release_conn()
return fnd, err