niming_backend/blueprints/admin/utils.py

67 lines
2.6 KiB
Python
Raw Permalink Normal View History

2024-12-23 02:03:55 +08:00
from functools import wraps
import os
from flask import request, g, abort
import jwt
from utils.misc import error
from utils.platform_consts import PLIST_ROOT
from utils import pgclass, dbhelper
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
# auth decorator
def role_required(permreq: list):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# get data 嘗試解碼jwt
key = os.getenv("JWT_KEY", None)
jwtsession = request.cookies.get("token", None)
if jwtsession == None: # no session
return error("You do not have permission to view this page."), 401
jwtsession = str(jwtsession)
# decode
try:
jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"])
except jwt.exceptions.ExpiredSignatureError: # token expired
return error("Token expired!"), 401
except jwt.exceptions.DecodeError: # invalid token
return error("Invalid token!"), 401
if "id" not in jwtdata or "user" not in jwtdata: # invalid token (struct)
return error("Invalid token!"), 401
# db 驗證帳號是否正確
table = pgclass.SQLuser
with dbhelper.db.getsession() as session:
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
if res is None: # user not found
return error("You do not have permission to view this page."), 401
# permission check 確保用戶有此路徑要求的權限 並且權限名稱皆合法
permissionList = list(set(res.permission))
for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱
if p not in PLIST_ROOT: return error("The user has invalid permission."), 402
for p in list(set(permreq)): # 檢查要求的權限是否為用戶所持有
if p not in permissionList: return error("You do not have permission to view this page."), 402
# return
g.opuser = res
return f(*args, **kwargs)
return decorated_function
return decorator
def check_key(type:str, key:str | int) -> str | int:
if type == 'article':
if not (len(key) > 0 and key.isdigit()):
return abort(400)
outkey = int(key) # id
elif type == 'comment':
if not (len(key) > 0):
return abort(400)
outkey = str(key) # sha1
else:
return abort(404)
return outkey