niming_backend/blueprints/admin.py

247 lines
9.1 KiB
Python
Raw Normal View History

2024-11-19 21:22:01 +08:00
import os
import time
import math
import json
import jwt
2024-11-19 22:58:15 +08:00
from flask import Blueprint, request, jsonify, make_response, g
2024-11-19 02:19:25 +08:00
from bcrypt import hashpw, gensalt, checkpw
2024-11-19 21:22:01 +08:00
from functools import wraps
from utils import pgclass, setting_loader, logger
from utils.misc import error
from utils.dbhelper import db, solo_article_fetcher, multi_article_fetcher, solo_file_fetcher, solo_article_remover
from utils.platform_consts import PLIST, PLIST_ROOT
2024-11-18 02:47:25 +08:00
admin = Blueprint("admin", __name__)
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
# auth decorator
def role_required(permreq: list):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
2024-11-19 22:58:15 +08:00
# get data 嘗試解碼jwt
2024-11-18 02:47:25 +08:00
key = os.getenv("JWT_KEY", None)
2024-11-19 02:19:25 +08:00
jwtsession = request.cookies.get("token", None)
2024-11-19 21:22:01 +08:00
if jwtsession == None: return error("You do not have permission to view this page."), 401
2024-11-19 02:19:25 +08:00
jwtsession = str(jwtsession)
try: jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"])
2024-11-19 21:22:01 +08:00
except jwt.exceptions.ExpiredSignatureError: return error("Token expired!"), 401
except jwt.exceptions.DecodeError: return error("Invalid token!"), 401
if "id" not in jwtdata or "user" not in jwtdata: return error("Invalid token!"), 401
2024-11-18 02:47:25 +08:00
2024-11-19 22:58:15 +08:00
# db 驗證帳號是否正確
2024-11-18 02:47:25 +08:00
table = pgclass.SQLuser
2024-11-19 21:22:01 +08:00
with db.getsession() as session:
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
if res is None: return error("You do not have permission to view this page."), 401
2024-11-18 02:47:25 +08:00
2024-11-19 22:58:15 +08:00
# permission check 確保用戶有此路徑要求的權限 並且權限名稱皆合法
2024-11-19 02:19:25 +08:00
permissionList = list(set(res.permission))
for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱
2024-11-19 21:22:01 +08:00
if p not in PLIST_ROOT: return error("The user has invalid permission."), 402
2024-11-19 02:19:25 +08:00
for p in list(set(permreq)):
2024-11-19 21:22:01 +08:00
if p not in permissionList: return error("You do not have permission to view this page."), 402
2024-11-18 02:47:25 +08:00
# return
2024-11-19 22:58:15 +08:00
g.opuser = res
2024-11-18 02:47:25 +08:00
return f(*args, **kwargs)
return decorated_function
return decorator
# login
@admin.route("/login", methods=["POST"])
def login():
# args
2024-11-19 21:22:01 +08:00
if "username" not in request.json or "password" not in request.json: return error("Arguments error"), 400
2024-11-18 02:47:25 +08:00
username = str(request.json["username"])
2024-11-19 02:19:25 +08:00
password = str(request.json["password"])
2024-11-18 02:47:25 +08:00
# variables
settings = setting_loader.loadset()
exptime = int(settings["JWT_Valid_Time"])
# db
table = pgclass.SQLuser
2024-11-19 21:22:01 +08:00
with db.getsession() as session: u = session.query(table).filter(table.user==username).first()
2024-11-18 02:47:25 +08:00
# auth
2024-11-19 21:22:01 +08:00
if u is None: return error("Login Failed"), 401 # 找不到用戶
if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return error("Login Failed"), 401 # 密碼沒法跟hash對上
2024-11-18 02:47:25 +08:00
# jwt
key = os.getenv("JWT_KEY", None)
2024-11-19 21:22:01 +08:00
if key is None: return error("JWT_KEY error"), 500
2024-11-19 02:19:25 +08:00
jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))}
2024-11-18 02:47:25 +08:00
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
# logger
2024-11-19 21:22:01 +08:00
logger.logger("login", "User:%s logined"%username)
2024-11-18 02:47:25 +08:00
# cookie
r = make_response("Access Granted")
2024-11-19 02:19:25 +08:00
r.set_cookie("token", jwtdata)
2024-11-19 21:22:01 +08:00
return r, 200
2024-11-18 02:47:25 +08:00
2024-11-19 02:19:25 +08:00
@admin.route("me", methods=["GET"])
@role_required([])
def user_me():
2024-11-19 22:58:15 +08:00
opuser = g.opuser
2024-11-19 21:22:01 +08:00
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission}), 200
2024-11-19 02:19:25 +08:00
2024-11-18 02:47:25 +08:00
####################
# User Area #
####################
2024-11-19 02:19:25 +08:00
# list / get / add / delete
2024-11-19 21:22:01 +08:00
@admin.route("/user/list", methods={"GET"})
2024-11-18 02:47:25 +08:00
@role_required([])
def user_list():
table = pgclass.SQLuser
2024-11-19 21:22:01 +08:00
with db.getsession() as session: users = session.query(table).all()
2024-11-18 02:47:25 +08:00
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
2024-11-19 21:22:01 +08:00
return jsonify(res), 200
2024-11-18 02:47:25 +08:00
2024-11-19 21:22:01 +08:00
@admin.route("/user/<int:id>", methods=["GET"])
2024-11-18 02:47:25 +08:00
@role_required([])
def user_get(id:int):
table = pgclass.SQLuser
2024-11-19 21:22:01 +08:00
with db.getsession() as session: u = session.query(table).filter(table.id==int(id)).first()
if u is None: return error("User not found"), 404
return jsonify({"id":u.id, "user":u.user, "permission":u.permission}), 200
@admin.route("/user/<int:id>", methods=["DELETE"])
@role_required(["usermgr"])
def user_del(id:int):
# db
table = pgclass.SQLuser
with db.getsession() as session:
2024-11-19 22:58:15 +08:00
opuser = g.opuser # user who requested
2024-11-19 21:22:01 +08:00
# check root
tguser = session.query(table).filter(table.id==int(id)).first()
if tguser is None: return error("User is not exist"), 400
if tguser.user == "root": return error("You cannot delete user:root"), 400
# delete
session.delete(tguser)
session.commit()
2024-11-18 02:47:25 +08:00
2024-11-19 21:22:01 +08:00
logger.logger("user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
return jsonify({"result":"OK"}), 200
2024-11-18 02:47:25 +08:00
2024-11-19 21:22:01 +08:00
@admin.route("/user", methods=["POST"])
2024-11-18 02:47:25 +08:00
@role_required(["usermgr"])
def user_add():
# db
table = pgclass.SQLuser
2024-11-19 21:22:01 +08:00
with db.getsession() as session:
2024-11-19 02:19:25 +08:00
# user who requested
2024-11-19 22:58:15 +08:00
opuser = g.opuser
2024-11-19 02:19:25 +08:00
# payload
if "username" not in request.json or "password" not in request.json or \
2024-11-19 21:22:01 +08:00
"permission" not in request.json or not(isinstance(request.json["permission"], list)):
return error("Arguments error"), 400
2024-11-19 02:19:25 +08:00
username = str(request.json["username"])
password = str(request.json["password"])
permission = list(set([ str(p) for p in list(request.json["permission"]) ]))
# check username and password
2024-11-19 21:22:01 +08:00
if username == None or len(username) == 0 or password is None or len(password) == 0:
return error("Invalid Username or Password!"), 400
2024-11-19 02:19:25 +08:00
# check permission list
for p in permission:
2024-11-19 21:22:01 +08:00
if p not in PLIST: return error("Invalid Permission"), 400 # 如果添加的權限名稱不合法
if p not in opuser.permission: return error("You don't have the permission: %s"%p), 402 # 如果用戶本身不具有相同權限
2024-11-19 02:19:25 +08:00
# add
users = session.query(table).filter(table.user==username).first()
if users is None: # check whether the user already exist
pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
session.add(table(user=username, password=pwhash, permission=permission))
session.commit()
2024-11-19 21:22:01 +08:00
logger.logger("user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger
return jsonify({"user":username, "permission":permission}), 200
2024-11-19 02:19:25 +08:00
else:
2024-11-19 21:22:01 +08:00
return error("User already exist!"), 400
2024-11-18 02:47:25 +08:00
####################
# Article Area #
####################
2024-11-19 02:19:25 +08:00
# list / get / pend / delete / fileget
2024-11-19 21:22:01 +08:00
@admin.route("/article/file/<int:id>", methods = ["GET"])
2024-11-19 02:19:25 +08:00
@role_required(["article.read"])
2024-11-19 21:22:01 +08:00
def article_fileget(id:int):
resp, code = solo_file_fetcher("admin", id)
return resp, code
2024-11-19 02:19:25 +08:00
2024-11-19 21:22:01 +08:00
@admin.route('/article/list', methods = ["GET"])
@role_required(["article.read"])
def article_list():
res, code = multi_article_fetcher("admin", request.args.get("start"), request.args.get("count"))
return res, code
2024-11-19 02:19:25 +08:00
2024-11-19 21:22:01 +08:00
@admin.route("/article/<int:id>", methods=["GET"])
2024-11-19 02:19:25 +08:00
@role_required(["article.read"])
def article_read(id:int):
2024-11-19 21:22:01 +08:00
res, code = solo_article_fetcher("admin", id)
return jsonify(res), code
@admin.route("/article/<int:id>", methods=["DELETE"])
2024-11-19 02:19:25 +08:00
@role_required(["article.del"])
def article_del(id:int):
2024-11-19 22:58:15 +08:00
opuser = g.opuser
2024-11-19 02:19:25 +08:00
2024-11-19 21:22:01 +08:00
result, code = solo_article_remover("admin", id=id)
if "error" in result: return jsonify(result), code
2024-11-19 02:19:25 +08:00
2024-11-19 21:22:01 +08:00
logger.logger("article.delete", "User:%s deleted post (id=%d with comments %s): last_status=%s"%(opuser.user, result["id"], result["rcl"], result["mark"]))
return jsonify({"result":"OK"}), 200
@admin.route("/article/<int:id>", methods=["PUT"])
2024-11-19 02:19:25 +08:00
@role_required(["article.pend"])
def article_pend(id:int):
# db
table = pgclass.SQLarticle
2024-11-19 21:22:01 +08:00
with db.getsession() as session:
2024-11-19 02:19:25 +08:00
# 確保文章存在
res = session.query(table).filter(table.id==int(id)).first()
2024-11-19 21:22:01 +08:00
if res is None: return error("Post not found"), 404
2024-11-19 02:19:25 +08:00
# 如果文章已經公開
if res.mark == "visible":
2024-11-19 21:22:01 +08:00
return error("Post is already visible."), 400
2024-11-19 02:19:25 +08:00
elif res.mark == "pending":
res.mark = "visible"
session.commit()
# run IG Post
2024-11-19 21:22:01 +08:00
return jsonify({"result":"OK"}), 200
2024-11-19 02:19:25 +08:00
else:
2024-11-19 21:22:01 +08:00
return error("Post mark error"), 500
2024-11-18 02:47:25 +08:00
####################
# Setting Area #
####################
2024-11-19 02:19:25 +08:00
# get / set
2024-11-19 21:22:01 +08:00
@admin.route("/setting", methods=["GET"])
2024-11-18 02:47:25 +08:00
@role_required(["setting.edit"])
def setting_get():
return jsonify(setting_loader.loadset()), 200
2024-11-19 21:22:01 +08:00
@admin.route("/setting", methods=["POST"])
2024-11-18 02:47:25 +08:00
@role_required(["setting.edit"])
def setting_edit():
2024-11-19 22:58:15 +08:00
opuser = g.opuser
2024-11-19 21:22:01 +08:00
2024-11-18 02:47:25 +08:00
req = request.json
d = None
for r in req:
d = setting_loader.writeset(r, req.get(r))
2024-11-19 21:22:01 +08:00
if d == 0: return error("Failed"), 401
logger.logger("setting.modify", "User:%s modified settings: %s"%(opuser, json.dumps(request.json)))
2024-11-18 02:47:25 +08:00
return jsonify(d), 200