import os import time import math import jwt from flask import Blueprint, request, jsonify, make_response, g from bcrypt import hashpw, gensalt, checkpw from utils import pgclass, setting_loader, logger, dbhelper from utils.misc import error from utils.platform_consts import PLIST from blueprints.admin.utils import role_required # prefix: /admin/user bl_admin_user = Blueprint("admin_user", __name__) # login @bl_admin_user.route("/login", methods=["POST"]) def login(): # args if "username" not in request.json or "password" not in request.json: return error("Arguments error"), 400 username = str(request.json["username"]) password = str(request.json["password"]) # variables settings = setting_loader.loadset() exptime = int(settings["JWT_Valid_Time"]) # db table = pgclass.SQLuser with dbhelper.db.getsession() as session: u = session.query(table).filter(table.user==username).first() # auth if u is None: return error("Login Failed"), 401 # 找不到用戶 if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return error("Login Failed"), 401 # 密碼沒法跟hash對上 # jwt key = os.getenv("JWT_KEY", None) if key is None: return error("JWT_KEY error"), 500 jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))} jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256") # logger logger.logger("login", "User:%s logined"%username) # cookie r = make_response("Access Granted") r.set_cookie("token", jwtdata) return r, 200 # check who i am @bl_admin_user.route("/me", methods=["GET"]) @role_required([]) def user_me(): opuser = g.opuser return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission}), 200 #################### # User Area # #################### # list users @bl_admin_user.route("/list", methods={"GET"}) @role_required([]) def user_list(): table = pgclass.SQLuser with dbhelper.db.getsession() as session: users = session.query(table).all() res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ] return jsonify(res), 200 # show an user's info @bl_admin_user.route("/", methods=["GET"]) @role_required([]) def user_get(id:int): table = pgclass.SQLuser with dbhelper.db.getsession() as session: u = session.query(table).filter(table.id==int(id)).first() if u is None: return error("User not found"), 404 return jsonify({"id":u.id, "user":u.user, "permission":u.permission}), 200 # delete an user @bl_admin_user.route("/", methods=["DELETE"]) @role_required(["usermgr"]) def user_del(id:int): # db table = pgclass.SQLuser with dbhelper.db.getsession() as session: opuser = g.opuser # user who requested # check root tguser = session.query(table).filter(table.id==int(id)).first() if tguser is None: return error("User is not exist"), 400 if tguser.user == "root": return error("You cannot delete user:root"), 400 # delete session.delete(tguser) session.commit() logger.logger("user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger return jsonify({"result":"OK"}), 200 # new user @bl_admin_user.route("/", methods=["POST"]) @role_required(["usermgr"]) def user_add(): # db table = pgclass.SQLuser with dbhelper.db.getsession() as session: # user who requested opuser = g.opuser # payload if "username" not in request.json or "password" not in request.json or \ "permission" not in request.json or not(isinstance(request.json["permission"], list)): return error("Arguments error"), 400 username = str(request.json["username"]) password = str(request.json["password"]) permission = list(set([ str(p) for p in list(request.json["permission"]) ])) # check username and password if username == None or len(username) == 0 or password is None or len(password) == 0: return error("Invalid Username or Password!"), 400 # check permission list for p in permission: if p not in PLIST: return error("Invalid Permission"), 400 # 如果添加的權限名稱不合法 if p not in opuser.permission: return error("You don't have the permission: %s"%p), 402 # 如果用戶本身不具有相同權限 # add users = session.query(table).filter(table.user==username).first() if users is None: # check whether the user already exist pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8") session.add(table(user=username, password=pwhash, permission=permission)) session.commit() logger.logger("user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger return jsonify({"user":username, "permission":permission}), 200 else: return error("User already exist!"), 400