This commit is contained in:
p23 2024-11-19 13:22:01 +00:00
parent 06bb6bf008
commit 252cc5b5d9
10 changed files with 393 additions and 396 deletions

69
app.py
View File

@ -1,20 +1,16 @@
import os
from flask import Flask, jsonify from flask import Flask, jsonify
# from dotenv import load_dotenv
# load_dotenv()
import os, hashlib
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from utils.pgclass import Base, SQLuser
from utils.platform_consts import pList_root
from utils.setting_loader import loadset, typechecker
from utils import logger
from bcrypt import checkpw, gensalt, hashpw from bcrypt import checkpw, gensalt, hashpw
# blueprints from sqlalchemy import create_engine
from utils import pgclass, setting_loader, logger, dbhelper
from utils.platform_consts import PLIST_ROOT
from blueprints.article import article from blueprints.article import article
from blueprints.log import log from blueprints.log import log
from blueprints.admin import admin from blueprints.admin import admin
# Global Variables # env
PG_HOST = os.getenv("PG_HOST", None) PG_HOST = os.getenv("PG_HOST", None)
PG_PORT = os.getenv("PG_PORT", None) PG_PORT = os.getenv("PG_PORT", None)
PG_NAME = os.getenv("PG_NAME", None) PG_NAME = os.getenv("PG_NAME", None)
@ -25,46 +21,40 @@ PLATFORM_ROOT_PASSWORD = os.getenv("PLATFORM_ROOT_PASSWORD", None)
# env checker # env checker
errmsg = [] errmsg = []
if JWT_KEY is None or len(JWT_KEY) == 0: errmsg.append("Invalid JWT_KEY") if JWT_KEY is None or len(JWT_KEY) == 0:
if PLATFORM_ROOT_PASSWORD is None or len(PLATFORM_ROOT_PASSWORD) == 0: errmsg.append("Invalid PLATFORM_ROOT_PASSWORD") errmsg.append("Invalid JWT_KEY")
if PLATFORM_ROOT_PASSWORD is None or len(PLATFORM_ROOT_PASSWORD) == 0:
errmsg.append("Invalid PLATFORM_ROOT_PASSWORD")
if len(errmsg): if len(errmsg):
print(f"Env check failed: {errmsg}") print(f"Env check failed: {errmsg}")
exit(0) exit(0)
# Postgresql
engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME))
pgclass.Base.metadata.create_all(engine)
dbhelper.db(engine)
# settings checker # settings checker
settings = loadset() settings = setting_loader.loadset()
for s in settings: for s in settings:
if not typechecker(s, settings.get(s)): if not setting_loader.typechecker(s, settings.get(s)):
print("Settings.json data type check failed: %s"%s) print("Settings.json data type check failed: %s"%s)
exit(0) exit(0)
# Postgresql
engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME))
Base.metadata.create_all(engine)
# root checker # root checker
pwhash = hashpw(PLATFORM_ROOT_PASSWORD.encode("utf-8"), gensalt()).decode("utf-8") pwhash = hashpw(PLATFORM_ROOT_PASSWORD.encode("utf-8"), gensalt()).decode("utf-8") # if needed, new password
rootperm = pList_root with dbhelper.db.getsession() as session:
Session = sessionmaker(bind=engine) root = session.query(pgclass.SQLuser).filter(pgclass.SQLuser.user=="root").first()
session = Session() if root is None: # 沒有root
root = session.query(SQLuser).filter(SQLuser.user=="root").first() session.add(pgclass.SQLuser(user="root",password=pwhash, permission=PLIST_ROOT))
if (root is None): elif (not checkpw(PLATFORM_ROOT_PASSWORD.encode("utf-8"), root.password.encode("utf-8"))) or root.permission != PLIST_ROOT:
session.add(SQLuser(user="root",password=pwhash, permission=rootperm))
elif ((not checkpw(PLATFORM_ROOT_PASSWORD.encode("utf-8"), root.password.encode("utf-8"))) or root.permission != rootperm):
session.delete(root) session.delete(root)
session.add(SQLuser(user="root",password=pwhash, permission=rootperm)) session.add(pgclass.SQLuser(user="root",password=pwhash, permission=PLIST_ROOT))
session.commit() session.commit()
session.close()
# shared class
class shared():
def __init__(self, engine):
self.engine = engine
sh = shared(engine)
# flask app # flask app
app = Flask(__name__) app = Flask(__name__)
app.config["SECRET_KEY"] = os.urandom(64) app.config["SECRET_KEY"] = os.urandom(64)
app.shared_resource = sh
# register blueprints # register blueprints
app.register_blueprint(article, url_prefix = "/article") app.register_blueprint(article, url_prefix = "/article")
@ -72,18 +62,13 @@ app.register_blueprint(log , url_prefix = "/log")
app.register_blueprint(admin , url_prefix = "/admin") app.register_blueprint(admin , url_prefix = "/admin")
# logger # logger
logger.logger(engine, "server.start", "Server is running") logger.logger("server.start", "Server is running")
# index # index
@app.route("/", methods = ["GET", "POST"]) @app.route("/", methods = ["GET", "POST"])
def index(): def index():
return "Hello! World!<br>Shirakami Fubuki: cutest fox!!!" return "Hello! World!<br>Shirakami Fubuki: cutest fox!!!"
# global error handler
# @app.errorhandler(Exception)
# def handle_exception(e):
# return jsonify({"error": "Internal server error"}), 500
# app run # app run
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)

View File

@ -1,11 +1,17 @@
from flask import Blueprint, request, current_app, abort, jsonify, make_response, abort import os
import jwt, os, time, math import time
from utils import pgclass, setting_loader, logger import math
from utils.platform_consts import pList, pList_root import json
from functools import wraps
from sqlalchemy.orm import sessionmaker import jwt
from sqlalchemy import desc from flask import Blueprint, request, jsonify, make_response
from bcrypt import hashpw, gensalt, checkpw from bcrypt import hashpw, gensalt, checkpw
from functools import wraps
from utils import pgclass, setting_loader, logger
from utils.misc import error
from utils.dbhelper import db, solo_article_fetcher, multi_article_fetcher, solo_file_fetcher, solo_article_remover
from utils.platform_consts import PLIST, PLIST_ROOT
admin = Blueprint("admin", __name__) admin = Blueprint("admin", __name__)
@ -19,28 +25,25 @@ def role_required(permreq: list):
# get data # get data
key = os.getenv("JWT_KEY", None) key = os.getenv("JWT_KEY", None)
jwtsession = request.cookies.get("token", None) jwtsession = request.cookies.get("token", None)
if jwtsession == None: return "You do not have permission to view this page.", 401 if jwtsession == None: return error("You do not have permission to view this page."), 401
jwtsession = str(jwtsession) jwtsession = str(jwtsession)
try: jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"]) try: jwtdata = jwt.decode(jwt = jwtsession, key = key, algorithms = ["HS256"])
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401 except jwt.exceptions.ExpiredSignatureError: return error("Token expired!"), 401
except jwt.exceptions.DecodeError: return "Invalid token!", 401 except jwt.exceptions.DecodeError: return error("Invalid token!"), 401
if "id" not in jwtdata or "user" not in jwtdata: return "Invalid token!", 401 if "id" not in jwtdata or "user" not in jwtdata: return error("Invalid token!"), 401
# db # db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser table = pgclass.SQLuser
with db.getsession() as session:
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first() res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
session.close() if res is None: return error("You do not have permission to view this page."), 401
if res is None: return "You do not have permission to view this page.", 401
# permission check # permission check
permissionList = list(set(res.permission)) permissionList = list(set(res.permission))
for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱 for p in permissionList: # 檢查用戶JWT是否有不合法的權限名稱
if p not in pList_root: return "The user has invalid permission.", 402 if p not in PLIST_ROOT: return error("The user has invalid permission."), 402
for p in list(set(permreq)): for p in list(set(permreq)):
if p not in permissionList: return "You do not have permission to view this page.", 402 if p not in permissionList: return error("You do not have permission to view this page."), 402
# return # return
return f(*args, **kwargs) return f(*args, **kwargs)
@ -51,9 +54,9 @@ def getopuser(session, cookie):
table = pgclass.SQLuser table = pgclass.SQLuser
jwtsession = str(cookie) jwtsession = str(cookie)
try: opuser = jwt.decode(jwt = jwtsession, key = os.getenv("JWT_KEY"), algorithms = ["HS256"]) try: opuser = jwt.decode(jwt = jwtsession, key = os.getenv("JWT_KEY"), algorithms = ["HS256"])
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401 except jwt.exceptions.ExpiredSignatureError: return error("Token expired!"), 401
except jwt.exceptions.DecodeError: return "Invalid token!", 401 except jwt.exceptions.DecodeError: return error("Invalid token!"), 401
if "id" not in opuser or "user" not in opuser: return "Invalid token!", 401 if "id" not in opuser or "user" not in opuser: return error("Invalid token!"), 401
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first() opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
return opuser, None return opuser, None
@ -61,7 +64,7 @@ def getopuser(session, cookie):
@admin.route("/login", methods=["POST"]) @admin.route("/login", methods=["POST"])
def login(): def login():
# args # args
if "username" not in request.json or "password" not in request.json: return "Arguments error", 400 if "username" not in request.json or "password" not in request.json: return error("Arguments error"), 400
username = str(request.json["username"]) username = str(request.json["username"])
password = str(request.json["password"]) password = str(request.json["password"])
@ -70,88 +73,101 @@ def login():
exptime = int(settings["JWT_Valid_Time"]) exptime = int(settings["JWT_Valid_Time"])
# db # db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLuser table = pgclass.SQLuser
with Session() as session: u = session.query(table).filter(table.user==username).first() with db.getsession() as session: u = session.query(table).filter(table.user==username).first()
# auth # auth
if u is None: return "Login Failed", 401 # 找不到用戶 if u is None: return error("Login Failed"), 401 # 找不到用戶
if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return "Login Failed", 401 # 密碼沒法跟hash對上 if not checkpw(password.encode("utf-8"), u.password.encode("utf-8")): return error("Login Failed"), 401 # 密碼沒法跟hash對上
# jwt # jwt
key = os.getenv("JWT_KEY", None) key = os.getenv("JWT_KEY", None)
if key is None: return abort(500) if key is None: return error("JWT_KEY error"), 500
jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))} jwtdata = {"id": u.id, "user":username, "exp":int(math.floor(time.time() + exptime))}
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256") jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
# logger # logger
logger.logger(db, "login", "User:%s logined"%username) logger.logger("login", "User:%s logined"%username)
# cookie # cookie
r = make_response("Access Granted") r = make_response("Access Granted")
r.set_cookie("token", jwtdata) r.set_cookie("token", jwtdata)
return r return r, 200
@admin.route("me", methods=["GET"]) @admin.route("me", methods=["GET"])
@role_required([]) @role_required([])
def user_me(): def user_me():
db = current_app.shared_resource.engine with db.getsession() as session:
Session = sessionmaker(bind=db)
with Session() as session:
opuser, err = getopuser(session, request.cookies.get("token")) opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err if err is not None: return opuser, err
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission}) return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission}), 200
#################### ####################
# User Area # # User Area #
#################### ####################
# list / get / add / delete # list / get / add / delete
@admin.route("user/list", methods={"GET"}) @admin.route("/user/list", methods={"GET"})
@role_required([]) @role_required([])
def user_list(): def user_list():
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLuser table = pgclass.SQLuser
with Session() as session: users = session.query(table).all() with db.getsession() as session: users = session.query(table).all()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ] res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
return jsonify(res) return jsonify(res), 200
@admin.route("user/get/<int:id>", methods=["GET"]) @admin.route("/user/<int:id>", methods=["GET"])
@role_required([]) @role_required([])
def user_get(id:int): def user_get(id:int):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLuser table = pgclass.SQLuser
with Session() as session: users = session.query(table).filter(table.id==int(id)).all() with db.getsession() as session: u = session.query(table).filter(table.id==int(id)).first()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ] if u is None: return error("User not found"), 404
return jsonify({"id":u.id, "user":u.user, "permission":u.permission}), 200
return jsonify(res) @admin.route("/user/<int:id>", methods=["DELETE"])
@admin.route("user/add", methods=["POST"])
@role_required(["usermgr"]) @role_required(["usermgr"])
def user_add(): def user_del(id:int):
# db # db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLuser table = pgclass.SQLuser
with Session() as session:
with db.getsession() as session:
# user who requested
opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err
# check root
tguser = session.query(table).filter(table.id==int(id)).first()
if tguser is None: return error("User is not exist"), 400
if tguser.user == "root": return error("You cannot delete user:root"), 400
# delete
session.delete(tguser)
session.commit()
logger.logger("user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
return jsonify({"result":"OK"}), 200
@admin.route("/user", methods=["POST"])
@role_required(["usermgr"])
def user_add():
# db
table = pgclass.SQLuser
with db.getsession() as session:
# user who requested # user who requested
opuser, err = getopuser(session, request.cookies.get("token")) opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err if err is not None: return opuser, err
if opuser is None: return "You don't have permission to view this page!", 402
# payload # payload
if "username" not in request.json or "password" not in request.json or \ if "username" not in request.json or "password" not in request.json or \
"permission" not in request.json or not(isinstance(request.json["permission"], list)): return "Arguments error", 400 "permission" not in request.json or not(isinstance(request.json["permission"], list)):
return error("Arguments error"), 400
username = str(request.json["username"]) username = str(request.json["username"])
password = str(request.json["password"]) password = str(request.json["password"])
permission = list(set([ str(p) for p in list(request.json["permission"]) ])) permission = list(set([ str(p) for p in list(request.json["permission"]) ]))
# check username and password # check username and password
if username == None or len(username) == 0 or password is None or len(password) == 0: return "Invalid Username or Password!", 400 if username == None or len(username) == 0 or password is None or len(password) == 0:
return error("Invalid Username or Password!"), 400
# check permission list # check permission list
for p in permission: for p in permission:
if p not in pList: return "Invalid Permission", 400 # 如果添加的權限名稱不合法 if p not in PLIST: return error("Invalid Permission"), 400 # 如果添加的權限名稱不合法
if p not in opuser.permission: return "You don't have the permission: %s"%p, 402 # 如果用戶本身不具有相同權限 if p not in opuser.permission: return error("You don't have the permission: %s"%p), 402 # 如果用戶本身不具有相同權限
# add # add
users = session.query(table).filter(table.user==username).first() users = session.query(table).filter(table.user==username).first()
@ -159,172 +175,92 @@ def user_add():
pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8") pwhash = hashpw(password.encode("utf-8"), gensalt()).decode("utf-8")
session.add(table(user=username, password=pwhash, permission=permission)) session.add(table(user=username, password=pwhash, permission=permission))
session.commit() session.commit()
logger.logger(db, "user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger logger.logger("user.create", "User:%s created a new user:%s"%(opuser.user, username)) # logger
return jsonify({"user":username, "permission":permission}) return jsonify({"user":username, "permission":permission}), 200
else: else:
return "User already exist!" return error("User already exist!"), 400
@admin.route("user/delete/<int:id>", methods=["DELETE"])
@role_required(["usermgr"])
def user_del(id:int):
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLuser
with Session() as session:
# user who requested
opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err
if opuser is None: return "You don't have permission to view this page!", 402
# check root
tguser = session.query(table).filter(table.id==int(id)).first()
if tguser is None: return "User is not exist", 400
if tguser.user == "root": return "You cannot delete user:root", 400
# delete
session.delete(tguser)
session.commit()
logger.logger(db, "user.delete", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
return "OK", 200
#################### ####################
# Article Area # # Article Area #
#################### ####################
# list / get / pend / delete / fileget # list / get / pend / delete / fileget
@admin.route('article/list', methods = ["GET"]) @admin.route("/article/file/<int:id>", methods = ["GET"])
@role_required(["article.read"])
def article_fileget(id:int):
resp, code = solo_file_fetcher("admin", id)
return resp, code
@admin.route('/article/list', methods = ["GET"])
@role_required(["article.read"]) @role_required(["article.read"])
def article_list(): def article_list():
# variables res, code = multi_article_fetcher("admin", request.args.get("start"), request.args.get("count"))
if request.args.get("start") is None or request.args.get("count") is None or \ return res, code
request.args.get("start").isdigit()==False or request.args.get("count").isdigit()==False: return "Arguments error", 400
rst = int(request.args.get("start"))
count = int(request.args.get("count"))
# db @admin.route("/article/<int:id>", methods=["GET"])
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
# get ctx
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with Session() as session: res = session.query(table).order_by(desc(table.id)).filter(table.reference == None).offset(rst).limit(count).all()
# mapping
res = [ {"id":r.id, "ctx":r.ctx, "igid":r.igid, "created_at":r.created_at, "mark":r.mark, "ip":r.ip,
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r.hash).all() ] } for r in res ]
return jsonify(res), 200
@admin.route("article/get/<int:id>", methods=["GET"])
@role_required(["article.read"]) @role_required(["article.read"])
def article_read(id:int): def article_read(id:int):
db = current_app.shared_resource.engine res, code = solo_article_fetcher("admin", id)
Session = sessionmaker(bind=db) return jsonify(res), code
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
# get ctx
with Session() as session: res = session.query(table).filter(table.id == int(id)).first()
# mapping
resfn = {
"id":res.id, "ctx":res.ctx, "igid":res.igid, "created_at":res.created_at, "mark":res.mark, "reference":res.reference, "ip":res.ip,
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference==res.hash).all() ],
"comment": [ c[0] for c in session.query(table.id).filter(table.reference==res.id).all() ]
}
return jsonify([resfn])
@admin.route("article/delete/<int:id>", methods=["DELETE"]) @admin.route("/article/<int:id>", methods=["DELETE"])
@role_required(["article.del"]) @role_required(["article.del"])
def article_del(id:int): def article_del(id:int):
# db with db.getsession() as session:
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with Session() as session:
opuser, err = getopuser(session, request.cookies.get("token")) opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err if err is not None: return opuser, err
res = session.query(table).filter(table.id == id).first() # 本體 result, code = solo_article_remover("admin", id=id)
session.query(ftab).filter(ftab.reference == res.hash).delete() # 檔案 if "error" in result: return jsonify(result), code
rcl = [] logger.logger("article.delete", "User:%s deleted post (id=%d with comments %s): last_status=%s"%(opuser.user, result["id"], result["rcl"], result["mark"]))
# 留言 return jsonify({"result":"OK"}), 200
for c in session.query(table).filter(table.reference == res.id).all():
rcl.append(c.id)
# 刪留言的檔案
session.query(ftab).filter(ftab.reference == c.hash).delete()
# 刪留言
session.delete(c)
# 刪本體
session.delete(res)
# commit
session.commit()
# logger
logger.logger(db, "article.delete", "User:%s deleted post (id=%d with comments %s): last_status=%s"%(opuser.user, res.id, str(rcl), res.mark))
return "OK", 200
@admin.route("article/pend/<int:id>", methods=["PATCH"]) @admin.route("/article/<int:id>", methods=["PUT"])
@role_required(["article.pend"]) @role_required(["article.pend"])
def article_pend(id:int): def article_pend(id:int):
# db # db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle table = pgclass.SQLarticle
with Session() as session: with db.getsession() as session:
# 確保文章存在 # 確保文章存在
res = session.query(table).filter(table.id==int(id)).first() res = session.query(table).filter(table.id==int(id)).first()
if res is None: return "Post not found", 400 if res is None: return error("Post not found"), 404
# 如果文章已經公開 # 如果文章已經公開
if res.mark == "visible": if res.mark == "visible":
return "Post is already visible.", 400 return error("Post is already visible."), 400
elif res.mark == "pending": elif res.mark == "pending":
res.mark = "visible" res.mark = "visible"
session.commit() session.commit()
# run IG Post # run IG Post
return "OK", 200 return jsonify({"result":"OK"}), 200
else: else:
return "Post mark error", 500 return error("Post mark error"), 500
@admin.route("article/file/<int:id>", methods = ["GET"])
@role_required(["article.read"])
def article_fileget(id:int):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with Session() as session:
fres = session.query(ftab).filter(ftab.id == id).first()
if fres is None: return "File not found", 400 # 檢查檔案是否存在
article = session.query(table).filter(table.hash == fres.reference).first()
if article is None: return "File not found", 400 # 檢查文章本體是否存在
resp = make_response(fres.binary)
resp.headers.set("Content-Type", fres.type)
resp.headers.set("Content-Disposition", f"attachment; filename=file{fres.id}")
return resp
#################### ####################
# Setting Area # # Setting Area #
#################### ####################
# get / set # get / set
@admin.route("setting/get", methods=["GET"]) @admin.route("/setting", methods=["GET"])
@role_required(["setting.edit"]) @role_required(["setting.edit"])
def setting_get(): def setting_get():
return jsonify(setting_loader.loadset()), 200 return jsonify(setting_loader.loadset()), 200
@admin.route("setting/set", methods=["POST"]) @admin.route("/setting", methods=["POST"])
@role_required(["setting.edit"]) @role_required(["setting.edit"])
def setting_edit(): def setting_edit():
with db.getsession() as session:
opuser, err = getopuser(session, request.cookies.get("token"))
if err is not None: return opuser, err
opuser = opuser.user
req = request.json req = request.json
d = None d = None
for r in req: for r in req:
d = setting_loader.writeset(r, req.get(r)) d = setting_loader.writeset(r, req.get(r))
if d == 0: return "Failed", 401 if d == 0: return error("Failed"), 401
logger.logger("setting.modify", "User:%s modified settings: %s"%(opuser, json.dumps(request.json)))
return jsonify(d), 200 return jsonify(d), 200

View File

@ -1,13 +1,15 @@
from flask import Blueprint, current_app, request, jsonify, make_response
import hashlib
import time import time
import magic # apt install libmagic1 libmagic-dev -y import hashlib
from utils import logger, pgclass, setting_loader
from sqlalchemy.orm import sessionmaker import magic
from sqlalchemy import desc from flask import Blueprint, current_app, request, jsonify
from protobuf_files import niming_pb2
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from utils import logger, pgclass, setting_loader
from utils.dbhelper import db, solo_article_fetcher, multi_article_fetcher, solo_file_fetcher, solo_article_remover
from utils.misc import error
from protobuf_files import niming_pb2
""" """
TODO: TODO:
- IG post ( Po文刪文只PO本體文章 ) - IG post ( Po文刪文只PO本體文章 )
@ -23,85 +25,76 @@ article = Blueprint('article', __name__)
# 匿名文列表 # 匿名文列表
@article.route('/list', methods = ["GET"]) @article.route('/list', methods = ["GET"])
def listing(): def listing():
# variables res, code = multi_article_fetcher("general", request.args.get("start"), request.args.get("count"))
if request.args.get("start") is None or request.args.get("count") is None or \ return res, code
request.args.get("start").isdigit()==False or request.args.get("count").isdigit()==False: return "Arguments error", 400
rst = int(request.args.get("start"))
count = int(request.args.get("count"))
# db # 獲取匿名文附檔
db = current_app.shared_resource.engine @article.route("/file/<int:id>", methods=["GET"])
Session = sessionmaker(bind=db) def getfile(id:int):
with Session() as session: resp, code = solo_file_fetcher("general", id)
# get ctx return resp, code
# 只有發文者可以看到的獲取指定文章
# 只有發文者可以做到的刪除文章
@article.route("/own/<sha256>", methods = ["GET", "DELETE"])
def owner_getarticle(sha256:str):
table = pgclass.SQLarticle table = pgclass.SQLarticle
ftab = pgclass.SQLfile ftab = pgclass.SQLfile
res = session.query(table.id, table.ctx, table.igid, table.created_at, table.mark, table.hash).order_by(desc(table.id)).filter(table.mark == 'visible', table.reference == None).offset(rst).limit(count).all()
# mapping # 獲取指定文章
res = [ {"id":r[0], "ctx":r[1], "igid":r[2], "created_at":r[3], "mark":r[4], if request.method == "GET":
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r[5]).all() ] } for r in res ] resfn, code = solo_article_fetcher("owner", key=sha256)
return jsonify(resfn), code
# 刪除指定文章跟他們的留言、檔案
elif request.method == "DELETE":
result, code = solo_article_remover("general", hash=sha256)
if "error" in result: return jsonify(result), code
return jsonify(res), 200 logger.logger("delpost", "Delete post (id=%d with comments %s): last_status=%s"
%(result["id"], str(result["rcl"]), str(result["mark"])))
return jsonify({"result":"OK"}), code
# 獲取指定文章 # 獲取指定文章
@article.route("/get/<int:id>", methods = ["GET"]) @article.route("/<int:id>", methods = ["GET"])
def getarticle(id:int): def getarticle(id:int):
db = current_app.shared_resource.engine resfn, code = solo_article_fetcher("general", key=id)
Session = sessionmaker(bind=db) return jsonify(resfn), code
with Session() as session:
# get ctx
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
res = session.query(table.id, table.ctx, table.igid, table.created_at, table.mark, table.reference, table.hash).filter(table.id == id).filter(table.mark == 'visible').all()
# mapping
resfn = [
{"id":r[0], "ctx":r[1], "igid":r[2], "created_at":r[3], "mark":r[4], "reference":r[5], # basic
"comment": [ c[0] for c in session.query(table.id).filter(table.reference == int(r[0]), table.mark == "visible").all() ], # comment
"files": [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r[6]).all() ]
}
for r in res
]
return jsonify(resfn), 200
# 上傳文章 / 留言 # 上傳文章 / 留言
@article.route("/post", methods = ["POST"]) @article.route("/", methods = ["POST"])
def posting(): def posting():
# flow: # flow:
# ctx -> hash -> reference -> file -> IP -> IG -> mark -> post | -> log # ctx -> hash -> reference -> file -> IP -> IG -> mark -> post | -> log
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle
# loadset # loadset
opt = setting_loader.loadset() opt = setting_loader.loadset()
chk_before_post = opt["Check_Before_Post"] chk_before_post = opt["Check_Before_Post"]
maxword = opt["Niming_Max_Word"] maxword = opt["Niming_Max_Word"]
# data parse
# protobuf parse
recv = niming_pb2.DataMessage() recv = niming_pb2.DataMessage()
try: recv.ParseFromString(request.data) try: recv.ParseFromString(request.data)
except DecodeError: return "Protobuf decode error", 400 except DecodeError: return error("Protobuf decode error"), 400
# content # content
ctx = str(recv.ctx) # request.json["ctx"] ctx = str(recv.ctx)
# length check if len(ctx) == 0 or len(ctx) > maxword: # length check
if len(ctx) == 0 or len(ctx) > maxword: return "no content or too many words", 400 return error("no content or too many words"), 400
# hash # hash
seed = ctx + str(time.time()) seed = ctx + str(time.time())
hash = hashlib.sha256(seed.encode()).hexdigest() hash = hashlib.sha256(seed.encode()).hexdigest()
with Session() as session: # SQL start
table = pgclass.SQLarticle
with db.getsession() as session:
# reference # reference
ref = int(recv.ref) # request.json["ref"] ref = int(recv.ref)
if not (ref == 0): # 如果ref不是0 if not (ref == 0): # 如果ref不是0
# 檢查是不是指向存在的文章 # 檢查是不是指向存在的文章
chk = session.query(table).filter(table.id == ref, table.mark == "visible").first() chk = session.query(table).filter(table.id == ref, table.mark == "visible").first()
if chk is None: return "Invalid Reference", 400 if chk is None: return error("Invalid Reference"), 400
# 檢查指向的文章是否也是留言 # 檢查指向的文章是否也是留言
if not(chk.reference is None): return "Invalid Reference", 400 if not(chk.reference is None): return error("Invalid Reference"), 400
else: else:
ref = None ref = None
@ -110,15 +103,15 @@ def posting():
# check - size # check - size
atts = opt["Attachment_Count"] atts = opt["Attachment_Count"]
sizelimit = opt["Attachment_Size"] sizelimit = opt["Attachment_Size"]
if len(files) > atts: return "Too many files", 400 if len(files) > atts: return error("Too many files"), 400
for f in files: for f in files:
if len(f) <= 0 or len(f) > sizelimit: return "File size error", 400 if len(f) <= 0 or len(f) > sizelimit: return error("File size error"), 400
# check - mimetype # check - mimetype
allowed_mime = opt["Allowed_MIME"] allowed_mime = opt["Allowed_MIME"]
for f in files: for f in files:
mime = magic.Magic(mime=True) mime = magic.Magic(mime=True)
type = mime.from_buffer(f) type = mime.from_buffer(f)
if not(type in allowed_mime): return "File format error", 400 if not(type in allowed_mime): return error("File format error"), 400
# run processor # run processor
ftab = pgclass.SQLfile ftab = pgclass.SQLfile
for f in files: for f in files:
@ -145,78 +138,8 @@ def posting():
data = table(hash = hash, ctx = ctx, igid = igid, mark = mark, reference = ref, ip = ip) data = table(hash = hash, ctx = ctx, igid = igid, mark = mark, reference = ref, ip = ip)
session.add(data) session.add(data)
session.commit() session.commit()
# pg getdata
res = session.query(table.id, table.ctx, table.igid, table.created_at, table.mark, table.hash, table.reference).filter(table.hash == hash).all()
fres = session.query(ftab.id).filter(ftab.reference == hash).all()
res = [ {"id":r[0], "ctx":r[1], "igid":r[2], "created_at":r[3], "mark":r[4], "hash":r[5], "reference":r[6],
"files": [f[0] for f in fres]
} for r in res ]
result, code = solo_article_fetcher(role="owner", key=hash)
# logger # logger
logger.logger(db, "newpost", "New post (id=%d point to %s): %s"%(res[0]["id"], ref, mark)) logger.logger("newpost", "New post (id=%d point to %s): %s"%(result["id"], ref, mark))
return result, code
return jsonify(res), 201
# 只有發文者可以看到的獲取指定文章
# 只有發文者可以做到的刪除文章
@article.route("/own/<sha256>", methods = ["GET", "DELETE"])
def owner_getarticle(sha256:str):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
# 獲取指定文章
if request.method == "GET":
with Session() as session:
res = session.query(table.id, table.ctx, table.igid, table.created_at, table.mark, table.hash, table.reference).filter(table.hash == sha256).all()
resfn = [
{"id":r[0], "ctx":r[1], "igid":r[2], "created_at":r[3], "mark":r[4], "hash":r[5], "reference":r[6],
"comment":[ c[0] for c in session.query(table.id).filter(table.reference == int(r[0])).all() ], # comments
"files":[ f[0] for f in session.query(ftab.id).filter(ftab.reference == r[5]).all() ]} # files
for r in res
]
return jsonify(resfn), 200
# 刪除指定文章跟他們的留言、檔案
elif request.method == "DELETE":
with Session() as session:
rcl = []
res = session.query(table).filter(table.hash == sha256).first() # 本體
if res is None: return "Post not found", 400 # 檢查本體是否存在
# 刪除本體檔案
session.query(ftab).filter(ftab.reference == res.hash).delete()
# 刪留言
resc = session.query(table).filter(table.reference == res.id).all() # 留言
for c in resc:
rcl.append(c.id)
# 刪留言的檔案
session.query(ftab).filter(ftab.reference == c.hash).delete()
# 刪留言
session.delete(c)
# 刪本體
session.delete(res)
# commit
session.commit()
# logger
logger.logger(db, "delpost", "Delete post (id=%d with comments %s): last_status=%s"%(res.id, str(rcl), res.mark))
return "OK", 200
session.close()
# 獲取匿名文附檔
@article.route("/file/<int:id>", methods=["GET"])
def getfile(id:int):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with Session() as session:
fres = session.query(ftab).filter(ftab.id == id).first()
if fres is None: return "File not found", 400 # 檢查檔案是否存在
article = session.query(table).filter(table.hash == fres.reference, table.mark == 'visible').first()
if article is None: return "File not found", 400 # 檢查文章本體是否存在/可以閱覽
resp = make_response(fres.binary)
resp.headers.set("Content-Type", fres.type)
resp.headers.set("Content-Disposition", f"attachment; filename=file{fres.id}")
return resp

View File

@ -1,7 +1,9 @@
from flask import current_app, Blueprint, request, jsonify from flask import Blueprint, request, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import desc from sqlalchemy import desc
from utils import pgclass from utils import pgclass
from utils.dbhelper import db
from utils.misc import error
log = Blueprint('log', __name__) log = Blueprint('log', __name__)
@ -10,35 +12,32 @@ log = Blueprint('log', __name__)
def listlog(): def listlog():
# variables # variables
if request.args.get("start") is None or request.args.get("count") is None or \ if request.args.get("start") is None or request.args.get("count") is None or \
request.args.get("start").isdigit()==False or request.args.get("count").isdigit()==False: return "Arguments error", 400 request.args.get("start").isdigit()==False or request.args.get("count").isdigit()==False:
return error("Arguments error"), 400
rst = int(request.args.get("start")) rst = int(request.args.get("start"))
count = int(request.args.get("count")) count = int(request.args.get("count"))
# db # getctx
db = current_app.shared_resource.engine with db.getsession() as session:
Session = sessionmaker(bind=db)
# get ctx
with Session() as session:
table = pgclass.SQLlog table = pgclass.SQLlog
res = session.query(table).order_by(desc(table.id)).offset(rst).limit(count).all() res = session.query(table).order_by(desc(table.id)).offset(rst).limit(count).all()
# mapping # mapping
res = [ {"id":r.id, "created_at":r.created_at, "source":r.source, "message":r.message} for r in res ] res = [ {"id":r.id, "created_at":r.created_at, "source":r.source, "message":r.message} for r in res ]
return jsonify(res) return jsonify(res), 200
# 指定顯示特定一條log # 指定顯示特定一條log
@log.route("/get/<int:id>", methods = ["GET"]) @log.route("/<int:id>", methods = ["GET"])
def getlog(id:int): def getlog(id:int):
# db # db
db = current_app.shared_resource.engine with db.getsession() as session:
Session = sessionmaker(bind=db)
# get ctx
with Session() as session:
table = pgclass.SQLlog table = pgclass.SQLlog
res = session.query(table).filter(table.id == id).all() res = session.query(table).filter(table.id == id).first()
if res is None:
return error("Log not Found"), 404
# mapping # mapping
res = [ {"id":r.id, "created_at":r.created_at, "source":r.source, "message":r.message} for r in res ] res = {"id":res.id, "created_at":res.created_at, "source":res.source, "message":res.message}
return jsonify(res) return jsonify(res), 200

View File

@ -1 +1 @@
{"Check_Before_Post": false, "JWT_Valid_Time": 604800, "Niming_Max_Word": 500, "Attachment_Count": 5, "Attachment_Size": 209715200, "Allowed_MIME": ["image/jpeg", "image/pjpeg", "image/png", "image/heic", "image/heif", "video/mp4", "video/quicktime", "video/hevc", "image/gif", "image/webp"]} {"Check_Before_Post": true, "JWT_Valid_Time": 604800, "Niming_Max_Word": 500, "Attachment_Count": 5, "Attachment_Size": 209715200, "Allowed_MIME": ["image/jpeg", "image/pjpeg", "image/png", "image/heic", "image/heif", "video/mp4", "video/quicktime", "video/hevc", "image/gif", "image/webp"]}

143
utils/dbhelper.py Normal file
View File

@ -0,0 +1,143 @@
from typing import Tuple, Dict, List
from flask import make_response, Response, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import desc
from utils import pgclass
from utils.misc import error
class db:
_engine = None
@classmethod
def __init__(self, engine):
self._engine = engine
@classmethod
def getsession(self):
Session = sessionmaker(bind=self._engine)
return Session()
# role (general) (owner) (admin)
# 獲取單一文章
def solo_article_fetcher(role:str, key) -> Tuple[Dict,int]: # admin, owner, general
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
resfn = {}
with db.getsession() as session:
# query
if role == "owner":
res = session.query(table).filter(table.hash == key).first()
elif role == "admin":
res = session.query(table).filter(table.id == key).first()
elif role == "general":
res = session.query(table).filter(table.id == key, table.mark == "visible").first()
if res is None: return {"error":"Post not found"}, 404
# mapping
resfn.update({"id": res.id, "ctx": res.ctx, "igid": res.igid, "mark": res.mark, "reference": res.reference})
if role == "admin": resfn["ip"] = res.ip
elif role == "owner": resfn["hash"] = res.hash
# comment
if role == "owner" or role == "admin":
resfn["comment"] = [ c[0] for c in session.query(table.id).filter(table.reference == int(res.id)).all() ]
elif role == "general":
resfn["comment"] = [ c[0] for c in session.query(table.id).filter(table.reference == int(res.id), table.mark == "visible").all() ]
# file
resfn["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == res.hash).all() ]
return resfn, 200
# 獲取文章列表
def multi_article_fetcher(role:str, start:str, count:str) -> Tuple[Response, int]: # general, admin
# checker
if start is None or count is None or \
start.isdigit()==False or count.isdigit()==False:
return error("Arguments error"), 400
start = int(start)
count = int(count)
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
resfn = []
with db.getsession() as session:
# query
if role == "general":
res = session.query(table).filter(table.mark == "visible", table.reference == None)
elif role == "admin":
res = session.query(table).filter(table.reference == None)
res = res.order_by(desc(table.id)).offset(start).limit(count).all()
# mapping
for r in res:
rup = {"id":r.id, "ctx":r.ctx, "igid":r.igid, "created_at":r.created_at, "mark":r.mark}
if role == "admin": rup["ip"] = r.ip # 如果是管理員 多給ip
rup["files"] = [ f[0] for f in session.query(ftab.id).filter(ftab.reference == r.hash).all() ] # 檔案
resfn.append(rup)
return jsonify(resfn), 200
# 刪除文章
def solo_article_remover(role:str, hash:str=None, id:int=None) -> Tuple[Dict, int]: # admin, general
key = None
if role == "admin": key = id
elif role == "general": key = hash
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
rcl = []
with db.getsession() as session:
# 獲取本體
if role == "admin":
res = session.query(table).filter(table.id == key).first()
elif role == "general":
res = session.query(table).filter(table.hash == key).first()
if res is None: # 檢查本體是否存在
return {"error":"Post not found"}, 404
# 刪除本體檔案
session.query(ftab).filter(ftab.reference == res.hash).delete()
# 刪留言
resc = session.query(table).filter(table.reference == res.id).all() # 留言
for c in resc:
rcl.append(c.id)
# 刪留言的檔案
session.query(ftab).filter(ftab.reference == c.hash).delete()
# 刪留言
session.delete(c)
# 刪本體
session.delete(res)
session.commit()
return {"id":res.id, "mark":res.mark, "rcl":rcl}, 200
# 獲取檔案
def solo_file_fetcher(role:str, id:int) -> Tuple[Response, int]: # general, admin
table = pgclass.SQLarticle
ftab = pgclass.SQLfile
with db.getsession() as session:
fres = session.query(ftab).filter(ftab.id == id).first()
if fres is None: # 檢查檔案是否存在
return error("File not found"), 404
if role == "general":
article = session.query(table).filter(table.hash == fres.reference, table.mark == 'visible').first()
elif role == "admin":
article = session.query(table).filter(table.hash == fres.reference).first()
if article is None: # 檢查文章本體是否存在/可以閱覽
return error("File not found"), 404
resp = make_response(fres.binary)
resp.headers.set("Content-Type", fres.type)
resp.headers.set("Content-Disposition", f"attachment; filename=file{fres.id}")
return resp, 200

View File

@ -1,29 +1,26 @@
from utils import pgclass from utils import pgclass
from sqlalchemy.orm import sessionmaker from utils.dbhelper import db
from utils.platform_consts import EVENT_TYPE_GENERAL, EVENT_TYPE_ADMIN, EVENT_TYPE_SERVER
def logger(engine, type, message): def logger(type, message):
Session = sessionmaker(bind=engine)
session = Session()
table = pgclass.SQLlog table = pgclass.SQLlog
flag = False flag = False
# new post & del post # new post & del post
if type == "newpost" or type == "delpost": if type in EVENT_TYPE_GENERAL:
flag = True flag = True
log = table(source = "general", message = message) log = table(source = "general", message = message)
elif type in ["login", "user.create", "user.delete", "article.delete", "article.pend"]: elif type in EVENT_TYPE_ADMIN:
flag = True flag = True
log = table(source = "admin", message = message) log = table(source = "admin", message = message)
elif type in ["server.start"]: elif type in EVENT_TYPE_SERVER:
flag = True flag = True
log = table(source = "server", message = message) log = table(source = "server", message = message)
# session.add # session.add
if flag: if flag:
with db.getsession() as session:
session.add(log) session.add(log)
session.commit() session.commit()
# session.close
session.close()

4
utils/misc.py Normal file
View File

@ -0,0 +1,4 @@
from flask import jsonify, Response
def error(message:str) -> Response:
return jsonify({"error":message})

View File

@ -1,9 +1,14 @@
# Permission List # Permission List
pList = ["article.read", "article.pend", "article.del", "setting.edit"] # no permission:usermgr except root PLIST = ["article.read", "article.pend", "article.del", "setting.edit"] # no permission:usermgr except root
pList_root = pList + ["usermgr"] PLIST_ROOT = PLIST + ["usermgr"]
# event type
EVENT_TYPE_GENERAL = ["newpost", "delpost"]
EVENT_TYPE_ADMIN = ["login", "user.create", "user.delete", "article.delete", "article.pend", "setting.modify"]
EVENT_TYPE_SERVER = ["server.start"]
# Platform Setting Model # Platform Setting Model
platform_setting_model = { PLATFORM_SETTING_MODEL = {
"Check_Before_Post": [bool], "Check_Before_Post": [bool],
"JWT_Valid_Time": [int], "JWT_Valid_Time": [int],
"Niming_Max_Word": [int], "Niming_Max_Word": [int],

View File

@ -1,5 +1,5 @@
import json import json
from utils.platform_consts import platform_setting_model from utils.platform_consts import PLATFORM_SETTING_MODEL
def loadset(): def loadset():
with open("./settings.json", "r", encoding = "utf-8") as f: with open("./settings.json", "r", encoding = "utf-8") as f:
@ -7,25 +7,30 @@ def loadset():
return d return d
def typechecker(name:str, value): def typechecker(name:str, value):
if not(name in platform_setting_model.keys()) or not(isinstance(value, platform_setting_model.get(name)[0])): # 型別驗證
if not(name in PLATFORM_SETTING_MODEL.keys()) or not(isinstance(value, PLATFORM_SETTING_MODEL.get(name)[0])):
return 0 return 0
# 確定是否是可迭代物件
iterable = False iterable = False
try: try:
iter(platform_setting_model.get(name)[0]) iter(PLATFORM_SETTING_MODEL.get(name)[0])
iterable = True iterable = True
except: except:
iterable = False iterable = False
# 如果可迭代,就把裡面每個元素都檢查型別
if iterable: if iterable:
for v in value: for v in value:
if not(isinstance(v, platform_setting_model.get(name)[1])): return 0 if not(isinstance(v, PLATFORM_SETTING_MODEL.get(name)[1])): return 0
return 1 return 1
def writeset(name:str, value): def writeset(name:str, value):
# 驗證寫入資料型別
if not typechecker(name, value): return 0 if not typechecker(name, value): return 0
# 寫入
d:dict = loadset() d:dict = loadset()
d[name] = value d[name] = value