330 lines
13 KiB
Python
330 lines
13 KiB
Python
from flask import Blueprint, request, current_app, abort, jsonify, make_response, abort
|
|
import jwt, os, time, math
|
|
from utils import pgclass, setting_loader, logger
|
|
from utils.platform_consts import pList, pList_root
|
|
from functools import wraps
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy import desc
|
|
from bcrypt import hashpw, gensalt, checkpw
|
|
|
|
admin = Blueprint("admin", __name__)
|
|
|
|
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
|
|
|
|
# auth decorator
|
|
def role_required(permreq: list):
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# get data
|
|
key = os.getenv("JWT_KEY", None)
|
|
jwtsession = request.cookies.get("token", None)
|
|
if jwtsession == None: return "You do not have permission to view this page.", 401
|
|
jwtsession = str(jwtsession)
|
|
try: jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"])
|
|
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401
|
|
except jwt.exceptions.DecodeError: return "Invalid token!", 401
|
|
if "id" not in jwtdata or "user" not in jwtdata: return "Invalid token!", 401
|
|
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
session = Session()
|
|
table = pgclass.SQLuser
|
|
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
|
|
session.close()
|
|
if res is None: return "You do not have permission to view this page.", 401
|
|
|
|
# permission check
|
|
permissionList = list(set(res.permission))
|
|
for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱
|
|
if p not in pList_root: return "The user has invalid permission.", 402
|
|
for p in list(set(permreq)):
|
|
if p not in permissionList: return "You do not have permission to view this page.", 402
|
|
|
|
# return
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
# get operator
|
|
def getopuser(session, cookie):
|
|
table = pgclass.SQLuser
|
|
jwtsession = str(cookie)
|
|
try: opuser = jwt.decode(jwt = jwtsession, key = os.getenv("JWT_KEY"), algorithms = ["HS256"])
|
|
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401
|
|
except jwt.exceptions.DecodeError: return "Invalid token!", 401
|
|
if "id" not in opuser or "user" not in opuser: return "Invalid token!", 401
|
|
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
|
|
return opuser, None
|
|
|
|
# login
|
|
@admin.route("/login", methods=["POST"])
|
|
def login():
|
|
# args
|
|
if "username" not in request.json or "password" not in request.json: return "Arguments error", 400
|
|
username = str(request.json["username"])
|
|
password = str(request.json["password"])
|
|
|
|
# variables
|
|
settings = setting_loader.loadset()
|
|
exptime = int(settings["JWT_Valid_Time"])
|
|
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLuser
|
|
with Session() as session: u = session.query(table).filter(table.user==username).first()
|
|
# auth
|
|
if u is None: return "Login Failed", 401 # 找不到用戶
|
|
if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return "Login Failed", 401 # 密碼沒法跟hash對上
|
|
|
|
# jwt
|
|
key = os.getenv("JWT_KEY", None)
|
|
if key is None: return abort(500)
|
|
jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))}
|
|
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
|
|
|
|
# logger
|
|
logger.logger(db, "login", "User:%s logined"%username)
|
|
|
|
# cookie
|
|
r = make_response("Access Granted")
|
|
r.set_cookie("token", jwtdata)
|
|
return r
|
|
|
|
@admin.route("me", methods=["GET"])
|
|
@role_required([])
|
|
def user_me():
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
with Session() as session:
|
|
opuser, err = getopuser(session, request.cookies.get("token"))
|
|
if err is not None: return opuser, err
|
|
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission})
|
|
|
|
####################
|
|
# User Area #
|
|
####################
|
|
# list / get / add / delete
|
|
@admin.route("user/list", methods={"GET"})
|
|
@role_required([])
|
|
def user_list():
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLuser
|
|
with Session() as session: users = session.query(table).all()
|
|
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
|
|
return jsonify(res)
|
|
|
|
@admin.route("user/get/<int:id>", methods=["GET"])
|
|
@role_required([])
|
|
def user_get(id:int):
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLuser
|
|
with Session() as session: users = session.query(table).filter(table.id==int(id)).all()
|
|
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
|
|
|
|
return jsonify(res)
|
|
|
|
@admin.route("user/add", methods=["POST"])
|
|
@role_required(["usermgr"])
|
|
def user_add():
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLuser
|
|
with Session() as session:
|
|
# user who requested
|
|
opuser, err = getopuser(session, request.cookies.get("token"))
|
|
if err is not None: return opuser, err
|
|
if opuser is None: return "You don't have permission to view this page!", 402
|
|
|
|
# payload
|
|
if "username" not in request.json or "password" not in request.json or \
|
|
"permission" not in request.json or not(isinstance(request.json["permission"], list)): return "Arguments error", 400
|
|
username = str(request.json["username"])
|
|
password = str(request.json["password"])
|
|
permission = list(set([ str(p) for p in list(request.json["permission"]) ]))
|
|
# check username and password
|
|
if username == None or len(username) == 0 or password is None or len(password) == 0: return "Invalid Username or Password!", 400
|
|
# check permission list
|
|
for p in permission:
|
|
if p not in pList: return "Invalid Permission", 400 # 如果添加的權限名稱不合法
|
|
if p not in opuser.permission: return "You don't have the permission: %s"%p, 402 # 如果用戶本身不具有相同權限
|
|
|
|
# add
|
|
users = session.query(table).filter(table.user==username).first()
|
|
if users is None: # check whether the user already exist
|
|
pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
|
|
session.add(table(user=username, password=pwhash, permission=permission))
|
|
session.commit()
|
|
logger.logger(db, "user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger
|
|
return jsonify({"user":username, "permission":permission})
|
|
else:
|
|
return "User already exist!"
|
|
|
|
@admin.route("user/delete/<int:id>", methods=["DELETE"])
|
|
@role_required(["usermgr"])
|
|
def user_del(id:int):
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLuser
|
|
|
|
with Session() as session:
|
|
# user who requested
|
|
opuser, err = getopuser(session, request.cookies.get("token"))
|
|
if err is not None: return opuser, err
|
|
if opuser is None: return "You don't have permission to view this page!", 402
|
|
|
|
# check root
|
|
tguser = session.query(table).filter(table.id==int(id)).first()
|
|
if tguser is None: return "User is not exist", 400
|
|
if tguser.user == "root": return "You cannot delete user:root", 400
|
|
|
|
# delete
|
|
session.delete(tguser)
|
|
session.commit()
|
|
|
|
logger.logger(db, "user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
|
|
return "OK", 200
|
|
|
|
####################
|
|
# Article Area #
|
|
####################
|
|
# list / get / pend / delete / fileget
|
|
|
|
@admin.route('article/list', methods = ["GET"])
|
|
@role_required(["article.read"])
|
|
def article_list():
|
|
# variables
|
|
if request.args.get("start") is None or request.args.get("count") is None or \
|
|
request.args.get("start").isdigit()==False or request.args.get("count").isdigit()==False: return "Arguments error", 400
|
|
rst = int(request.args.get("start"))
|
|
count = int(request.args.get("count"))
|
|
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
# get ctx
|
|
table = pgclass.SQLarticle
|
|
ftab = pgclass.SQLfile
|
|
with Session() as session: res = session.query(table).order_by(desc(table.id)).filter(table.reference == None).offset(rst).limit(count).all()
|
|
|
|
# mapping
|
|
res = [ {"id":r.id, "ctx":r.ctx, "igid":r.igid, "created_at":r.created_at, "mark":r.mark, "ip":r.ip,
|
|
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r.hash).all() ] } for r in res ]
|
|
|
|
return jsonify(res), 200
|
|
|
|
@admin.route("article/get/<int:id>", methods=["GET"])
|
|
@role_required(["article.read"])
|
|
def article_read(id:int):
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLarticle
|
|
ftab = pgclass.SQLfile
|
|
# get ctx
|
|
with Session() as session: res = session.query(table).filter(table.id == int(id)).first()
|
|
# mapping
|
|
resfn = {
|
|
"id":res.id, "ctx":res.ctx, "igid":res.igid, "created_at":res.created_at, "mark":res.mark, "reference":res.reference, "ip":res.ip,
|
|
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference==res.hash).all() ],
|
|
"comment": [ c[0] for c in session.query(table.id).filter(table.reference==res.id).all() ]
|
|
}
|
|
return jsonify([resfn])
|
|
|
|
@admin.route("article/delete/<int:id>", methods=["DELETE"])
|
|
@role_required(["article.del"])
|
|
def article_del(id:int):
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLarticle
|
|
ftab = pgclass.SQLfile
|
|
|
|
with Session() as session:
|
|
opuser, err = getopuser(session, request.cookies.get("token"))
|
|
if err is not None: return opuser, err
|
|
|
|
res = session.query(table).filter(table.id == id).first() # 本體
|
|
session.query(ftab).filter(ftab.reference == res.hash).delete() # 檔案
|
|
|
|
rcl = []
|
|
# 留言
|
|
for c in session.query(table).filter(table.reference == res.id).all():
|
|
rcl.append(c.id)
|
|
# 刪留言的檔案
|
|
session.query(ftab).filter(ftab.reference == c.hash).delete()
|
|
# 刪留言
|
|
session.delete(c)
|
|
# 刪本體
|
|
session.delete(res)
|
|
# commit
|
|
session.commit()
|
|
# logger
|
|
logger.logger(db, "article.delete", "User:%s deleted post (id=%d with comments %s): last_status=%s"%(opuser.user, res.id, str(rcl), res.mark))
|
|
return "OK", 200
|
|
|
|
@admin.route("article/pend/<int:id>", methods=["PATCH"])
|
|
@role_required(["article.pend"])
|
|
def article_pend(id:int):
|
|
# db
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
table = pgclass.SQLarticle
|
|
with Session() as session:
|
|
# 確保文章存在
|
|
res = session.query(table).filter(table.id==int(id)).first()
|
|
if res is None: return "Post not found", 400
|
|
|
|
# 如果文章已經公開
|
|
if res.mark == "visible":
|
|
return "Post is already visible.", 400
|
|
elif res.mark == "pending":
|
|
res.mark = "visible"
|
|
session.commit()
|
|
|
|
# run IG Post
|
|
|
|
return "OK", 200
|
|
else:
|
|
return "Post mark error", 500
|
|
|
|
@admin.route("article/file/<int:id>", methods = ["GET"])
|
|
@role_required(["article.read"])
|
|
def article_fileget(id:int):
|
|
db = current_app.shared_resource.engine
|
|
Session = sessionmaker(bind=db)
|
|
|
|
table = pgclass.SQLarticle
|
|
ftab = pgclass.SQLfile
|
|
with Session() as session:
|
|
fres = session.query(ftab).filter(ftab.id == id).first()
|
|
if fres is None: return "File not found", 400 # 檢查檔案是否存在
|
|
article = session.query(table).filter(table.hash == fres.reference).first()
|
|
if article is None: return "File not found", 400 # 檢查文章本體是否存在
|
|
resp = make_response(fres.binary)
|
|
resp.headers.set("Content-Type", fres.type)
|
|
resp.headers.set("Content-Disposition", f"attachment; filename=file{fres.id}")
|
|
return resp
|
|
|
|
####################
|
|
# Setting Area #
|
|
####################
|
|
# get / set
|
|
@admin.route("setting/get", methods=["GET"])
|
|
@role_required(["setting.edit"])
|
|
def setting_get():
|
|
return jsonify(setting_loader.loadset()), 200
|
|
|
|
@admin.route("setting/set", methods=["POST"])
|
|
@role_required(["setting.edit"])
|
|
def setting_edit():
|
|
req = request.json
|
|
d = None
|
|
for r in req:
|
|
d = setting_loader.writeset(r, req.get(r))
|
|
if d == 0: return "Failed", 401
|
|
return jsonify(d), 200 |