67 lines
2.6 KiB
Python
67 lines
2.6 KiB
Python
|
from functools import wraps
|
||
|
import os
|
||
|
|
||
|
from flask import request, g, abort
|
||
|
import jwt
|
||
|
|
||
|
from utils.misc import error
|
||
|
from utils.platform_consts import PLIST_ROOT
|
||
|
from utils import pgclass, dbhelper
|
||
|
|
||
|
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
|
||
|
|
||
|
# auth decorator
|
||
|
def role_required(permreq: list):
|
||
|
def decorator(f):
|
||
|
@wraps(f)
|
||
|
def decorated_function(*args, **kwargs):
|
||
|
# get data 嘗試解碼jwt
|
||
|
key = os.getenv("JWT_KEY", None)
|
||
|
jwtsession = request.cookies.get("token", None)
|
||
|
if jwtsession == None: # no session
|
||
|
return error("You do not have permission to view this page."), 401
|
||
|
jwtsession = str(jwtsession)
|
||
|
# decode
|
||
|
try:
|
||
|
jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"])
|
||
|
except jwt.exceptions.ExpiredSignatureError: # token expired
|
||
|
return error("Token expired!"), 401
|
||
|
except jwt.exceptions.DecodeError: # invalid token
|
||
|
return error("Invalid token!"), 401
|
||
|
if "id" not in jwtdata or "user" not in jwtdata: # invalid token (struct)
|
||
|
return error("Invalid token!"), 401
|
||
|
|
||
|
# db 驗證帳號是否正確
|
||
|
table = pgclass.SQLuser
|
||
|
with dbhelper.db.getsession() as session:
|
||
|
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
|
||
|
if res is None: # user not found
|
||
|
return error("You do not have permission to view this page."), 401
|
||
|
|
||
|
# permission check 確保用戶有此路徑要求的權限 並且權限名稱皆合法
|
||
|
permissionList = list(set(res.permission))
|
||
|
for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱
|
||
|
if p not in PLIST_ROOT: return error("The user has invalid permission."), 402
|
||
|
for p in list(set(permreq)): # 檢查要求的權限是否為用戶所持有
|
||
|
if p not in permissionList: return error("You do not have permission to view this page."), 402
|
||
|
|
||
|
# return
|
||
|
g.opuser = res
|
||
|
return f(*args, **kwargs)
|
||
|
return decorated_function
|
||
|
return decorator
|
||
|
|
||
|
|
||
|
def check_key(type:str, key:str | int) -> str | int:
|
||
|
if type == 'article':
|
||
|
if not (len(key) > 0 and key.isdigit()):
|
||
|
return abort(400)
|
||
|
outkey = int(key) # id
|
||
|
elif type == 'comment':
|
||
|
if not (len(key) > 0):
|
||
|
return abort(400)
|
||
|
outkey = str(key) # sha1
|
||
|
else:
|
||
|
return abort(404)
|
||
|
|
||
|
return outkey
|