niming_igapi/db/dbhelper.py
2024-12-17 19:06:40 +00:00

152 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Tuple, Dict
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Engine, text, update
from s3 import s3helper
from db import pgclass
class DB:
_engine = None
@classmethod
def __init__(cls, engine):
cls._engine:Engine = engine
@classmethod
def getsession(cls):
Session = sessionmaker(bind=cls._engine)
return Session()
db:DB = None
# role (general) (owner) (admin)
# 獲取單一文章
def solo_article_fetcher(role:str, key:int, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general
with db.getsession() as session:
# article fetch
stmt="SELECT posts.id, posts.content, posts.file_list, meta.igid, posts.hash, meta.ip, pmark.mark " \
+"FROM posts " \
+"INNER JOIN mark AS pmark ON posts.hash=pmark.hash " \
+"INNER JOIN article_meta AS meta ON posts.hash=meta.hash "
if role == "owner": # 驗證id/hash可以看到本體(無驗證)
stmt += "WHERE posts.id = :id AND posts.hash = :hash"
elif role == "admin": # 驗證id可以看到本體(無驗證)
stmt += "WHERE posts.id = :id"
elif role == "general": # 驗證id可以看到本體(visible)
stmt += "WHERE posts.id=:id AND pmark.mark='visible'"
result = session.execute(text(stmt), {"id":key, "hash":hash})
res = result.first()
if res is None:
return {}, 404
# comment fetch
stmt="SELECT c.sha1 " \
+"FROM posts " \
+"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \
+"INNER JOIN mark AS cmark ON c.hash=cmark.hash " \
+"WHERE posts.id=:id"
if role == "general": # 留言sha1(visible)
stmt+=" AND cmark.mark='visible'"
result = session.execute(text(stmt), {"id":res[0]})
cres = result.all()
# mapping
one = {
"id": res[0],
"content": res[1],
"igid": res[3],
}
if res[2]: # files
one["files_hash"] = res[2]
if res[4]: # comments
one["comments_hash"] = [ c[0] for c in cres ]
if role == "admin":
one["ip"] = res[5]
one["mark"] = res[6]
one["hash"] = res[4]
return one, 200
# role (general) (owner) (admin)
# 獲取單一留言 - 可能不會用到
def solo_comment_fetcher(role:str, key:str, hash:str=None) -> Tuple[Dict, int]: # admin, owner, general
with db.getsession() as session:
# query
stmt="SELECT posts.id AS parent_id, posts.hash AS parent_hash, pmark.mark AS parent_mark, cmark.mark AS comment_mark, c.* " \
+"FROM posts " \
+"INNER JOIN unnest(posts.comment_list) AS c ON c=ANY(posts.comment_list) " \
+"JOIN mark AS pmark ON posts.hash=pmark.hash " \
+"JOIN mark AS cmark ON c.hash=cmark.hash " \
+"WHERE c.sha1=:sha1 "
if role == "general":
# 對一般用戶sha1查詢確保本體跟留言可見
stmt += "AND pmark.mark='visible' AND cmark.mark='visible'"
arta = session.execute(text(stmt), {'sha1':key}).first()
elif role == "owner":
# 對發文者sha1查詢sha256查詢不設檢查
stmt += "AND c.hash=:hash"
arta = session.execute(text(stmt), {'sha1':key, 'hash':hash}).first()
elif role == "admin":
# 對管理員sha1查詢不設檢查
arta = session.execute(text(stmt), {'sha1':key}).first()
if arta is None:
return {}, 404
# mapping
one = {
"content": arta[4],
"sha1": arta[8]
}
if role == "admin":
one["ip"] = arta[5]
one["mark"] = arta[3]
one["hash"] = arta[6]
return one, 200
# 獲取檔案 - for IG
def solo_file_fetcher(role:str, fnhash:str) -> Tuple[dict, int]: # general, admin
with db.getsession() as session:
arta="SELECT posts.id FROM posts " \
+"INNER JOIN mark ON posts.hash=mark.hash " \
+"WHERE :fnhash=ANY (posts.file_list) "
if role == "general":
arta += "AND mark.mark = 'visible'"
arta = session.execute(text(arta), {'fnhash':fnhash}).first()
if arta is None: # 檢查文章本體是否存在/可以閱覽
return {}, 404
# fetch file
f, err = s3helper.solo_file_fetcher(fnhash)
if err:
return {}, 404
return f, 200
# 填入 igid
def solo_article_set_igid(id:int, igid:str) -> int:
# get hash
article, code = solo_article_fetcher(role="admin", key=id)
if code != 200:
return 1
hash = article["hash"]
# print(hash)
# edit igid
err = 0
article_meta = pgclass.SQLmeta
with db.getsession() as session:
try:
stmt = update(article_meta).where(article_meta.hash==hash).values(igid=igid)
session.execute(stmt)
except Exception as e:
err = 1
session.commit()
return err