niming_backend/blueprints/admin.py

213 lines
7.3 KiB
Python
Raw Normal View History

2024-11-18 02:47:25 +08:00
from flask import Blueprint, request, current_app, abort, jsonify, make_response, abort
import jwt, os, time, hashlib, math
from utils import pgclass, setting_loader, logger
from utils.platform_consts import pList, platform_setting_model
from functools import wraps
from sqlalchemy.orm import sessionmaker
2024-11-18 02:47:25 +08:00
admin = Blueprint("admin", __name__)
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
# permission list
# - usermgr (user management, add, remove, edit)
# - article.read
# - article.pend
# - article.del
# - setting.edit
# auth decorator
def role_required(permreq: list):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# get data
key = os.getenv("JWT_KEY", None)
jwtsession = str(request.cookies.get("token", None))
if jwtsession == None: return "You do not have permission to view this page.", 401
try: jwtdata = jwt.decode(jwtsession, key = key, algorithms = ["HS256"])
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401
except: return abort(500)
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
session.close()
if res is None: return "You do not have permission to view this page.", 401
# permission check
permissionList = res.permission
for p in permreq:
if p not in permissionList: return "You do not have permission to view this page.", 402
# return
return f(*args, **kwargs)
return decorated_function
return decorator
# login
@admin.route("/login", methods=["POST"])
def login():
# args
username = str(request.json["username"])
password = hashlib.sha512(str(request.json["password"]).encode()).hexdigest()
# variables
settings = setting_loader.loadset()
exptime = int(settings["JWT_Valid_Time"])
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# auth
u = session.query(table).filter(table.user==username, table.password==password).first()
session.close()
if u is None: return "Login Failed", 400
# jwt
key = os.getenv("JWT_KEY", None)
if key is None: return abort(500)
jwtdata = {"id": u.id, "user":username, "exp":str(math.floor(time.time() + exptime))}
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
# logger
logger.logger(db, "admin", "User:%s logined"%username)
# cookie
r = make_response("Access Granted")
r.set_cookie("token", jwtdata) # , httponly=True)
return r
####################
# User Area #
####################
# list / get / (admin/)me / add / delete
@admin.route("user/list", methods={"GET"})
@role_required([])
def user_list():
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
users = session.query(table).all()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
return jsonify(res)
@admin.route("user/get/<int:id>", methods=["GET"])
@role_required([])
def user_get(id:int):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
users = session.query(table).filter(table.id==int(id)).all()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
return jsonify(res)
@admin.route("me", methods=["GET"])
@role_required([])
def user_me():
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission})
@admin.route("user/add", methods=["POST"])
@role_required(["usermgr"])
def user_add():
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# user who requested
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
if opuser is None: return "You don't have permission to view this page!", 401
# payload
username = str(request.json["username"])
password = str(request.json["password"])
permission = list(request.json["permission"])
# check username and password
if username == None or len(username) == 0 or password is None or len(password) == 0: return "Invalid Username or Password!", 401
# check permission list
for p in permission:
if p not in pList: return "Invalid Permission", 401
if p not in opuser.permission: return "You don't have the permission: %s"%p, 401
# add
users = session.query(table).filter(table.user==username).first()
if users is None: # check whether the user is exist
pwhash = hashlib.sha512(password.encode()).hexdigest()
session.add(table(user=username, password=pwhash, permission=permission))
session.commit()
logger.logger(db, "admin", "User:%s created a new user:%s"%(opuser.user, username)) # logger
session.close()
return jsonify({"user":username, "permission":permission})
else:
session.close()
return "User is exist!"
@admin.route("user/delete/<int:id>", methods=["DELETE"])
@role_required(["usermgr"])
def user_del(id:int):
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# user who requested
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
if opuser is None: return "You don't have permission to view this page!", 401
# check root
tguser = session.query(table).filter(table.id==id).first()
if tguser is None: return "User is not exist", 400
if tguser.user == "root": return "You cannot delete user:root", 400
# delete
session.delete(tguser)
session.commit()
logger.logger(db, "admin", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
session.close()
return "OK", 200
####################
# Article Area #
####################
####################
# Setting Area #
####################
@admin.route("setting/get", methods=["GET"])
@role_required(["setting.edit"])
def setting_get():
return jsonify(setting_loader.loadset()), 200
2024-11-18 02:54:02 +08:00
@admin.route("setting/set", methods=["POST"])
2024-11-18 02:47:25 +08:00
@role_required(["setting.edit"])
def setting_edit():
req = request.json
d = None
for r in req:
d = setting_loader.writeset(r, req.get(r))
if d == 0: return "Failed", 401
return jsonify(d), 200