niming_backend/utils/dbhelper.py
2024-12-22 18:03:55 +00:00

400 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Tuple, Dict, List
from datetime import datetime
import time
import secrets
import hashlib
import os
from flask import make_response, Response, request
from sqlalchemy.orm import sessionmaker
from sqlalchemy import desc, update, Engine, text, delete
import pytz
from utils import pgclass, setting_loader, s3helper, logger, ighelper
from utils.misc import error
from protobuf_files import niming_pb2
class DB:
_engine = None
@classmethod
def __init__(cls, engine):
cls._engine:Engine = engine
@classmethod
def getsession(cls):
Session = sessionmaker(bind=cls._engine)
return Session()
db:DB = None
TIMEZONE:str = os.getenv("TIMEZONE")
# 上傳單一文章
def solo_article_uploader(content:str, file_list, fmimes:List[str]) -> Tuple[int, str]:
# loadset
opt = setting_loader.loadset()
chk_before_post = opt["Check_Before_Post"]
# hash
seed = content + str(time.time()) + str(secrets.token_urlsafe(nbytes=16))
hash = hashlib.sha256(seed.encode()).hexdigest()
# IP
ip = request.remote_addr
# tmp igid
igid = None
# mark
if chk_before_post: mark = "pending"
else: mark = "visible"
# posting
article = pgclass.SQLarticle
article_mark = pgclass.SQLmark
article_metadata = pgclass.SQLmeta
result_id = 0
with db.getsession() as session:
try:
# file processor
fnlist, err = s3helper.multi_file_uploader(file_list, fmimes)
if err:
return 0, ""
# db processor (meta, article, mark)
metaa = article_metadata(ip=ip, igid=igid, hash=hash)
posta = article(content=content, hash=hash, file_list=fnlist)
marka = article_mark(hash=hash, mark=mark)
session.add(metaa)
session.add(posta)
session.add(marka)
# commit
session.commit()
result_id = int(posta.id)
except:
session.rollback()
return 0, ""
# logger
logger.logger("newpost", "New post (id=%d): %s"%(result_id, mark))
# ig posting
if not chk_before_post: # 如果不用審核
result, err = ighelper.request_upload(result_id)
if err or result["result"] == "Canceled delete post request":
return 0, ""
return result_id, hash
# 上傳單一留言
def solo_comment_uploader(content:str, ref:int) -> Tuple[int | str, str]:
# loadset
opt = setting_loader.loadset()
chk_before_post = opt["Check_Before_Post"]
# hash
seed = content + str(time.time()) + str(secrets.token_urlsafe(nbytes=16))
hash = hashlib.sha256(seed.encode()).hexdigest()
sha1 = hashlib.sha1(seed.encode()).hexdigest()
# IP
ip = request.remote_addr
# mark
if chk_before_post: mark = "pending"
else: mark = "visible"
# posting
article = pgclass.SQLarticle
article_mark = pgclass.SQLmark
with db.getsession() as session:
try:
# article processor
cda = {
"content":content,
"ip":ip,
"hash":hash,
"created_at":datetime.now(pytz.timezone(TIMEZONE)),
"sha1":sha1
}
session.execute(
update(article)
.where(article.id == ref)
.values(comment_list=article.comment_list + [cda])
)
# mark processor
marka = article_mark(hash=hash,
mark=mark)
session.add(marka)
# commit
session.commit()
# logger
logger.logger("newcomment", "New comment %s points to %d: %s"%(sha1, ref, mark))
return sha1, hash
except Exception as e:
session.rollback()
return 0, ""
# role (general) (owner) (admin)
# 獲取單一文章
def solo_article_fetcher(role:str, key:int, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general
with db.getsession() as session:
# article fetch
stmt="SELECT posts.id, posts.content, posts.file_list, meta.igid, posts.hash, meta.ip, pmark.mark " \
+"FROM posts " \
+"INNER JOIN mark AS pmark ON posts.hash=pmark.hash " \
+"INNER JOIN article_meta AS meta ON posts.hash=meta.hash "
if role == "owner": # 驗證id/hash可以看到本體(無驗證)
stmt += "WHERE posts.id = :id AND posts.hash = :hash"
elif role == "admin": # 驗證id可以看到本體(無驗證)
stmt += "WHERE posts.id = :id"
elif role == "general": # 驗證id可以看到本體(visible)
stmt += "WHERE posts.id=:id AND pmark.mark='visible'"
result = session.execute(text(stmt), {"id":key, "hash":hash})
res = result.first()
if res is None:
return {}, 404
# comment fetch
stmt="SELECT c.sha1 " \
+"FROM posts " \
+"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \
+"INNER JOIN mark AS cmark ON c.hash=cmark.hash " \
+"WHERE posts.id=:id"
if role == "general": # 留言sha1(visible)
stmt+=" AND cmark.mark='visible'"
result = session.execute(text(stmt), {"id":res[0]})
cres = result.all()
# mapping
one = {
"id": res[0],
"content": res[1],
"igid": res[3],
}
if res[2]: # files
one["files_hash"] = res[2]
if res[4]: # comments
one["comments_hash"] = [ c[0] for c in cres ]
if role == "admin":
one["ip"] = res[5]
one["mark"] = res[6]
one["hash"] = res[4]
return one, 200
# role (general) (owner) (admin)
# 獲取單一留言
def solo_comment_fetcher(role:str, key:str, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general
with db.getsession() as session:
# query
stmt="SELECT posts.id AS parent_id, posts.hash AS parent_hash, pmark.mark AS parent_mark, cmark.mark AS comment_mark, c.* " \
+"FROM posts " \
+"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \
+"JOIN mark AS pmark ON posts.hash=pmark.hash " \
+"JOIN mark AS cmark ON c.hash=cmark.hash " \
+"WHERE c.sha1=:sha1 "
if role == "general":
# 對一般用戶sha1查詢確保本體跟留言可見
stmt += "AND pmark.mark='visible' AND cmark.mark='visible'"
arta = session.execute(text(stmt), {'sha1':key}).first()
elif role == "owner":
# 對發文者sha1查詢sha256查詢不設檢查
stmt += "AND c.hash=:hash"
arta = session.execute(text(stmt), {'sha1':key, 'hash':hash}).first()
elif role == "admin":
# 對管理員sha1查詢不設檢查
arta = session.execute(text(stmt), {'sha1':key}).first()
if arta is None:
return {}, 404
# mapping
one = {
"content": arta[4],
"sha1": arta[8]
}
if role == "admin":
one["ip"] = arta[5]
one["mark"] = arta[3]
one["hash"] = arta[6]
return one, 200
# 獲取文章列表
def multi_article_fetcher(role:str, page:str, count:int) -> Tuple[bytes, int]: # general, admin
# checker
if page is None or not page.isdigit():
return b"", 400
page = int(page)*count
# proto
if role == "admin":
pcl = niming_pb2.AdminFetchPostResponse
else:
pcl = niming_pb2.FetchPostResponse
resfn = pcl()
# db
article = pgclass.SQLarticle
article_meta = pgclass.SQLmeta
article_mark = pgclass.SQLmark
with db.getsession() as session:
# query
res = session.query(article.id, article.content, article.file_list, article_meta.igid, article.hash, article_meta.ip, article_mark.mark)
res = res.join(article_meta, article_meta.hash==article.hash)
res = res.join(article_mark, article_mark.hash==article.hash)
if role == "general":
res = res.filter(article_mark.mark == "visible")
res = res.order_by(desc(article.id)).offset(page).limit(count).all()
# mapping
for r in res:
one = pcl.Message(
id = r[0],
content = r[1],
igid = r[3],
)
if r[2]: # files
one.files_hash.extend(r[2])
if role == "admin": # 如果是管理員 多給 ip, hash, mark
one.hash = r[4]
one.ip = r[5]
one.mark = r[6]
resfn.posts.append(one)
return resfn.SerializeToString(), 200
# 刪除單一文章
def solo_article_remover(role:str, hash:str=None, id:int=None, opuser:str=None) -> Tuple[Dict, int]: # admin, owner
article = pgclass.SQLarticle
article_mark = pgclass.SQLmark
article_meta = pgclass.SQLmeta
with db.getsession() as session:
# 獲取本體
pres = session.query(article.id, article.hash, article_mark.mark, article.file_list, article_meta.igid) \
.join(article_mark, article.hash==article_mark.hash) \
.join(article_meta, article.hash==article_meta.hash)
if role == "admin":
pres = pres.filter(article.id == id).first()
elif role == "owner":
pres = pres.filter(article.id == id, article.hash == hash).first()
if pres is None: # 如果本體不存在
return {}, 404
# 獲取本體的留言們(hash)
stmt="SELECT c.hash as chash " \
+"FROM posts, unnest(posts.comment_list) AS c " \
+"WHERE posts.id = :id"
cres = session.execute(text(stmt), {'id':pres[0]}).all()
# 刪除本體
stmt = delete(article).where(article.hash == pres[1])
session.execute(stmt)
# 刪除 mark (本體 & 留言)
stmt = delete(article_mark).where(article_mark.hash == pres[1])
session.execute(stmt)
for c in cres:
stmt = delete(article_mark).where(article_mark.hash == c[0])
session.execute(stmt)
# 刪除檔案
err = s3helper.multi_file_remover(pres[3])
if err:
return {}, 500
session.commit()
# 刪除IG貼文
igid = pres[4]
if igid:
result, err = ighelper.request_delete(aid=pres[0], code=igid)
# 錯誤檢查
if err or result["result"] == "Canceled upload post request":
return {}, 500
# logger
logtype = "article.delete" if role == "admin" else "delpost"
loguser = "User:%s "%opuser if role == "admin" else ""
logger.logger(logtype, loguser+"Delete post (id=%d): last_status=%s"
%(int(pres[0]), str(pres[2])))
return {"id":pres[0], "mark":pres[2]}, 200
# 刪除單一留言
def solo_comment_remover(role:str, hash:str=None, sha1:str=None, opuser:str=None) -> Tuple[Dict, int]:
article_mark = pgclass.SQLmark
with db.getsession() as session:
# 獲取留言本體
stmt="SELECT posts.id AS parent, c.sha1, c.hash " \
+"FROM posts, unnest(posts.comment_list) AS c "
if role == "admin":
stmt += "WHERE c.sha1 = :sha1"
cres = session.execute(text(stmt), {'sha1':sha1}).first()
elif role == 'owner':
stmt += "WHERE c.sha1 = :sha1 AND c.hash = :hash"
cres = session.execute(text(stmt), {'sha1':sha1, 'hash':hash}).first()
if cres is None: # 如果不存在
return {}, 404
# 刪除留言本體
stmt="UPDATE posts " \
+"SET comment_list = ARRAY(" \
+"SELECT c " \
+"FROM unnest(comment_list) AS c " \
+"WHERE (c.sha1, c.hash) != (:sha1, :hash)" \
+")"
session.execute(text(stmt), {'sha1':cres[1], 'hash':cres[2]})
# 刪除留言mark
mark = session.query(article_mark.mark).filter(article_mark.hash == cres[2]).first()
stmt = delete(article_mark).where(article_mark.hash == cres[2])
session.execute(stmt)
session.commit()
logtype = "comment.delete" if role == "admin" else "delcomment"
loguser = "User:%s "%opuser if role == "admin" else ""
logger.logger(logtype, loguser+"Delete comment (sha1=%s): last_status=%s"
%(cres[1], str(mark[0])))
return {"sha1":cres[1], "mark":mark[0]}, 200
# 獲取檔案
def solo_file_fetcher(role:str, fnhash:str) -> Tuple[Response, int]: # general, admin
with db.getsession() as session:
arta="SELECT posts.id FROM posts " \
+"INNER JOIN mark ON posts.hash=mark.hash " \
+"WHERE :fnhash=ANY (posts.file_list) "
if role == "general":
arta += "AND mark.mark = 'visible'"
arta = session.execute(text(arta), {'fnhash':fnhash}).first()
if arta is None: # 檢查文章本體是否存在/可以閱覽
return error("File not found"), 404
# fetch file
f, err = s3helper.solo_file_fetcher(fnhash)
if err:
return error("File not found"), 404
resp = make_response(f["binary"])
resp.headers.set("Content-Type", f["mime"])
resp.headers.set("Content-Disposition", f"attachment; filename=file_{fnhash}")
return resp, 200