141 lines
5.0 KiB
Python
141 lines
5.0 KiB
Python
import os
|
|
import time
|
|
import math
|
|
|
|
import jwt
|
|
from flask import Blueprint, request, jsonify, make_response, g
|
|
from bcrypt import hashpw, gensalt, checkpw
|
|
|
|
from utils import pgclass, setting_loader, logger, dbhelper
|
|
from utils.misc import error
|
|
from utils.platform_consts import PLIST
|
|
from blueprints.admin.utils import role_required
|
|
|
|
# prefix: /admin/user
|
|
bl_admin_user = Blueprint("admin_user", __name__)
|
|
|
|
# login
|
|
@bl_admin_user.route("/login", methods=["POST"])
|
|
def login():
|
|
# args
|
|
if "username" not in request.json or "password" not in request.json: return error("Arguments error"), 400
|
|
username = str(request.json["username"])
|
|
password = str(request.json["password"])
|
|
|
|
# variables
|
|
settings = setting_loader.loadset()
|
|
exptime = int(settings["JWT_Valid_Time"])
|
|
|
|
# db
|
|
table = pgclass.SQLuser
|
|
with dbhelper.db.getsession() as session:
|
|
u = session.query(table).filter(table.user==username).first()
|
|
# auth
|
|
if u is None: return error("Login Failed"), 401 # 找不到用戶
|
|
if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return error("Login Failed"), 401 # 密碼沒法跟hash對上
|
|
|
|
# jwt
|
|
key = os.getenv("JWT_KEY", None)
|
|
if key is None: return error("JWT_KEY error"), 500
|
|
jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))}
|
|
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
|
|
|
|
# logger
|
|
logger.logger("login", "User:%s logined"%username)
|
|
|
|
# cookie
|
|
r = make_response("Access Granted")
|
|
r.set_cookie("token", jwtdata)
|
|
return r, 200
|
|
|
|
|
|
# check who i am
|
|
@bl_admin_user.route("/me", methods=["GET"])
|
|
@role_required([])
|
|
def user_me():
|
|
opuser = g.opuser
|
|
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission}), 200
|
|
|
|
|
|
####################
|
|
# User Area #
|
|
####################
|
|
# list users
|
|
@bl_admin_user.route("/list", methods={"GET"})
|
|
@role_required([])
|
|
def user_list():
|
|
table = pgclass.SQLuser
|
|
with dbhelper.db.getsession() as session: users = session.query(table).all()
|
|
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
|
|
return jsonify(res), 200
|
|
|
|
|
|
# show an user's info
|
|
@bl_admin_user.route("/<int:id>", methods=["GET"])
|
|
@role_required([])
|
|
def user_get(id:int):
|
|
table = pgclass.SQLuser
|
|
with dbhelper.db.getsession() as session: u = session.query(table).filter(table.id==int(id)).first()
|
|
if u is None: return error("User not found"), 404
|
|
return jsonify({"id":u.id, "user":u.user, "permission":u.permission}), 200
|
|
|
|
|
|
# delete an user
|
|
@bl_admin_user.route("/<int:id>", methods=["DELETE"])
|
|
@role_required(["usermgr"])
|
|
def user_del(id:int):
|
|
# db
|
|
table = pgclass.SQLuser
|
|
|
|
with dbhelper.db.getsession() as session:
|
|
opuser = g.opuser # user who requested
|
|
|
|
# check root
|
|
tguser = session.query(table).filter(table.id==int(id)).first()
|
|
if tguser is None: return error("User is not exist"), 400
|
|
if tguser.user == "root": return error("You cannot delete user:root"), 400
|
|
|
|
# delete
|
|
session.delete(tguser)
|
|
session.commit()
|
|
|
|
logger.logger("user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
|
|
return jsonify({"result":"OK"}), 200
|
|
|
|
|
|
# new user
|
|
@bl_admin_user.route("/", methods=["POST"])
|
|
@role_required(["usermgr"])
|
|
def user_add():
|
|
# db
|
|
table = pgclass.SQLuser
|
|
with dbhelper.db.getsession() as session:
|
|
# user who requested
|
|
opuser = g.opuser
|
|
|
|
# payload
|
|
if "username" not in request.json or "password" not in request.json or \
|
|
"permission" not in request.json or not(isinstance(request.json["permission"], list)):
|
|
return error("Arguments error"), 400
|
|
username = str(request.json["username"])
|
|
password = str(request.json["password"])
|
|
permission = list(set([ str(p) for p in list(request.json["permission"]) ]))
|
|
# check username and password
|
|
if username == None or len(username) == 0 or password is None or len(password) == 0:
|
|
return error("Invalid Username or Password!"), 400
|
|
# check permission list
|
|
for p in permission:
|
|
if p not in PLIST: return error("Invalid Permission"), 400 # 如果添加的權限名稱不合法
|
|
if p not in opuser.permission: return error("You don't have the permission: %s"%p), 402 # 如果用戶本身不具有相同權限
|
|
|
|
# add
|
|
users = session.query(table).filter(table.user==username).first()
|
|
if users is None: # check whether the user already exist
|
|
pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
|
|
session.add(table(user=username, password=pwhash, permission=permission))
|
|
session.commit()
|
|
logger.logger("user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger
|
|
return jsonify({"user":username, "permission":permission}), 200
|
|
else:
|
|
return error("User already exist!"), 400
|