niming_backend/utils/dbhelper.py
2024-11-19 15:29:01 +00:00

143 lines
5.1 KiB
Python

from typing import Tuple, Dict, List
from flask import make_response, Response, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import desc
from utils import pgclass
from utils.misc import error
class db:
_engine = None
@classmethod
def __init__(cls, engine):
cls._engine = engine
@classmethod
def getsession(cls):
Session = sessionmaker(bind=cls._engine)
return Session()
# role (general) (owner) (admin)
# 獲取單一文章
def solo_article_fetcher(role:str, key) -> Tuple[Dict,int]: # admin, owner, general
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
resfn = {}
with db.getsession() as session:
# query
if role == "owner":
res = session.query(table).filter(table.hash == key).first()
elif role == "admin":
res = session.query(table).filter(table.id == key).first()
elif role == "general":
res = session.query(table).filter(table.id == key, table.mark == "visible").first()
if res is None: return {"error":"Post not found"}, 404
# mapping
resfn.update({"id": res.id, "ctx": res.ctx, "igid": res.igid, "mark": res.mark, "reference": res.reference})
if role == "admin": resfn["ip"] = res.ip
elif role == "owner": resfn["hash"] = res.hash
# comment
if role == "owner" or role == "admin":
resfn["comment"] = [ c[0] for c in session.query(table.id).filter(table.reference == int(res.id)).all() ]
elif role == "general":
resfn["comment"] = [ c[0] for c in session.query(table.id).filter(table.reference == int(res.id), table.mark == "visible").all() ]
# file
resfn["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == res.hash).all() ]
return resfn, 200
# 獲取文章列表
def multi_article_fetcher(role:str, start:str, count:str) -> Tuple[Response, int]: # general, admin
# checker
if start is None or count is None or \
not start.isdigit() or not count.isdigit():
return error("Arguments error"), 400
start = int(start)
count = int(count)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
resfn = []
with db.getsession() as session:
# query
if role == "general":
res = session.query(table).filter(table.mark == "visible", table.reference == None)
elif role == "admin":
res = session.query(table).filter(table.reference == None)
res = res.order_by(desc(table.id)).offset(start).limit(count).all()
# mapping
for r in res:
rup = {"id":r.id, "ctx":r.ctx, "igid":r.igid, "created_at":r.created_at, "mark":r.mark}
if role == "admin": rup["ip"] = r.ip # 如果是管理員 多給ip
rup["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r.hash).all() ] # 檔案
resfn.append(rup)
return jsonify(resfn), 200
# 刪除文章
def solo_article_remover(role:str, hash:str=None, id:int=None) -> Tuple[Dict, int]: # admin, general
key = None
if role == "admin": key = id
elif role == "general": key = hash
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
rcl = []
with db.getsession() as session:
# 獲取本體
if role == "admin":
res = session.query(table).filter(table.id == key).first()
elif role == "general":
res = session.query(table).filter(table.hash == key).first()
if res is None: # 檢查本體是否存在
return {"error":"Post not found"}, 404
# 刪除本體檔案
session.query(ftab).filter(ftab.reference == res.hash).delete()
# 刪留言
resc = session.query(table).filter(table.reference == res.id).all() # 留言
for c in resc:
rcl.append(c.id)
# 刪留言的檔案
session.query(ftab).filter(ftab.reference == c.hash).delete()
# 刪留言
session.delete(c)
# 刪本體
session.delete(res)
session.commit()
return {"id":res.id, "mark":res.mark, "rcl":rcl}, 200
# 獲取檔案
def solo_file_fetcher(role:str, id:int) -> Tuple[Response, int]: # general, admin
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with db.getsession() as session:
fres = session.query(ftab).filter(ftab.id == id).first()
if fres is None: # 檢查檔案是否存在
return error("File not found"), 404
if role == "general":
article = session.query(table).filter(table.hash == fres.reference, table.mark == 'visible').first()
elif role == "admin":
article = session.query(table).filter(table.hash == fres.reference).first()
if article is None: # 檢查文章本體是否存在/可以閱覽
return error("File not found"), 404
resp = make_response(fres.binary)
resp.headers.set("Content-Type", fres.type)
resp.headers.set("Content-Disposition", f"attachment; filename=file{fres.id}")
return resp, 200