Ultimate Permission - Shirakami Fubuki

This commit is contained in:
p23 2024-11-17 18:47:25 +00:00
parent f74c7dd783
commit ada10f3851
9 changed files with 314 additions and 21 deletions

View File

@ -13,6 +13,4 @@ Run:
python3 app.py python3 app.py
``` ```
P23, a Sukonbu
Shirakami Fubuki is the cutest fox!!! Shirakami Fubuki is the cutest fox!!!

48
app.py
View File

@ -1,25 +1,57 @@
from flask import Flask from flask import Flask
# from dotenv import load_dotenv # from dotenv import load_dotenv
# load_dotenv() # load_dotenv()
import os import os, hashlib
from sqlalchemy import create_engine from sqlalchemy import create_engine
from utils.pgclass import Base from sqlalchemy.orm import sessionmaker
from utils.pgclass import Base, SQLuser
from utils.platform_consts import pList_root
from utils.setting_loader import loadset, typechecker
# blueprints # blueprints
from blueprints.article import article from blueprints.article import article
from blueprints.log import log from blueprints.log import log
from blueprints.admin import admin from blueprints.admin import admin
# Global Variables # Global Variables
PG_HOST = os.getenv("PG_HOST") PG_HOST = os.getenv("PG_HOST", None)
PG_PORT = os.getenv("PG_PORT") PG_PORT = os.getenv("PG_PORT", None)
PG_NAME = os.getenv("PG_NAME") PG_NAME = os.getenv("PG_NAME", None)
PG_USER = os.getenv("PG_USER") PG_USER = os.getenv("PG_USER", None)
PG_PASS = os.getenv("PG_PASS") PG_PASS = os.getenv("PG_PASS", None)
JWT_KEY = os.getenv("JWT_KEY") JWT_KEY = os.getenv("JWT_KEY", None)
PLATFORM_ROOT_PASSWORD = os.getenv("PLATFORM_ROOT_PASSWORD", None)
# env checker
errmsg = []
if JWT_KEY is None or len(JWT_KEY) == 0: errmsg.append("Invalid JWT_KEY")
if PLATFORM_ROOT_PASSWORD is None or len(PLATFORM_ROOT_PASSWORD) == 0: errmsg.append("Invalid PLATFORM_ROOT_PASSWORD")
if len(errmsg):
print(f"Env check failed: {errmsg}")
exit(0)
# settings checker
settings = loadset()
for s in settings:
if not typechecker(s, settings.get(s)):
print("Settings.json data type check failed: %s"%s)
exit(0)
# Postgresql # Postgresql
engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME)) engine = create_engine('postgresql+psycopg2://%s:%s@%s:%s/%s'%(PG_USER, PG_PASS, PG_HOST, PG_PORT, PG_NAME))
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
# root checker
pwhash = hashlib.sha512(PLATFORM_ROOT_PASSWORD.encode()).hexdigest()
rootperm = pList_root
Session = sessionmaker(bind=engine)
session = Session()
root = session.query(SQLuser).filter(SQLuser.user=="root").first()
if (root is None):
session.add(SQLuser(user="root",password=pwhash, permission=rootperm))
elif (root.password != pwhash or root.permission != rootperm):
session.delete(root)
session.add(SQLuser(user="root",password=pwhash, permission=rootperm))
session.commit()
session.close()
# shared class # shared class
class shared(): class shared():

View File

@ -1,3 +1,213 @@
from flask import Blueprint from flask import Blueprint, request, current_app, abort, jsonify, make_response, abort
import jwt, os, time, hashlib, math
from utils import pgclass, setting_loader, logger
from utils.platform_consts import pList, platform_setting_model
from functools import wraps
from sqlalchemy.orm import sessionmaker
admin = Blueprint("admin", __name__) admin = Blueprint("admin", __name__)
# jwt = {"id":user.id, "user":user.user, "exp":time.time}
# permission list
# - usermgr (user management, add, remove, edit)
# - article.read
# - article.pend
# - article.del
# - setting.edit
# auth decorator
def role_required(permreq: list):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# get data
key = os.getenv("JWT_KEY", None)
jwtsession = str(request.cookies.get("token", None))
if jwtsession == None: return "You do not have permission to view this page.", 401
try: jwtdata = jwt.decode(jwtsession, key = key, algorithms = ["HS256"])
except jwt.exceptions.ExpiredSignatureError: return "Token expired!", 401
except: return abort(500)
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
res = session.query(table).filter(table.user == jwtdata["user"], table.id == jwtdata["id"]).first()
session.close()
if res is None: return "You do not have permission to view this page.", 401
# permission check
permissionList = res.permission
for p in permreq:
if p not in permissionList: return "You do not have permission to view this page.", 402
# return
return f(*args, **kwargs)
return decorated_function
return decorator
# login
@admin.route("/login", methods=["POST"])
def login():
# args
username = str(request.json["username"])
password = hashlib.sha512(str(request.json["password"]).encode()).hexdigest()
# variables
settings = setting_loader.loadset()
exptime = int(settings["JWT_Valid_Time"])
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# auth
u = session.query(table).filter(table.user==username, table.password==password).first()
session.close()
if u is None: return "Login Failed", 400
# jwt
key = os.getenv("JWT_KEY", None)
if key is None: return abort(500)
jwtdata = {"id": u.id, "user":username, "exp":str(math.floor(time.time() + exptime))}
jwtdata = jwt.encode(payload = jwtdata, key = str(key), algorithm = "HS256")
# logger
logger.logger(db, "admin", "User:%s logined"%username)
# cookie
r = make_response("Access Granted")
r.set_cookie("token", jwtdata) # , httponly=True)
return r
####################
# User Area #
####################
# list / get / (admin/)me / add / delete
@admin.route("user/list", methods={"GET"})
@role_required([])
def user_list():
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
users = session.query(table).all()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
return jsonify(res)
@admin.route("user/get/<int:id>", methods=["GET"])
@role_required([])
def user_get(id:int):
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
users = session.query(table).filter(table.id==int(id)).all()
res = [ {"id":u.id, "user":u.user, "permission":u.permission} for u in users ]
return jsonify(res)
@admin.route("me", methods=["GET"])
@role_required([])
def user_me():
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
return jsonify({"id":opuser.id, "user":opuser.user, "permission":opuser.permission})
@admin.route("user/add", methods=["POST"])
@role_required(["usermgr"])
def user_add():
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# user who requested
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
if opuser is None: return "You don't have permission to view this page!", 401
# payload
username = str(request.json["username"])
password = str(request.json["password"])
permission = list(request.json["permission"])
# check username and password
if username == None or len(username) == 0 or password is None or len(password) == 0: return "Invalid Username or Password!", 401
# check permission list
for p in permission:
if p not in pList: return "Invalid Permission", 401
if p not in opuser.permission: return "You don't have the permission: %s"%p, 401
# add
users = session.query(table).filter(table.user==username).first()
if users is None: # check whether the user is exist
pwhash = hashlib.sha512(password.encode()).hexdigest()
session.add(table(user=username, password=pwhash, permission=permission))
session.commit()
logger.logger(db, "admin", "User:%s created a new user:%s"%(opuser.user, username)) # logger
session.close()
return jsonify({"user":username, "permission":permission})
else:
session.close()
return "User is exist!"
@admin.route("user/delete/<int:id>", methods=["DELETE"])
@role_required(["usermgr"])
def user_del(id:int):
# db
db = current_app.shared_resource.engine
Session = sessionmaker(bind=db)
session = Session()
table = pgclass.SQLuser
# user who requested
opuser = jwt.decode(jwt=request.cookies.get("token"), key=os.getenv("JWT_KEY"), algorithms="HS256")
opuser = session.query(table).filter(table.user==opuser["user"],table.id==opuser["id"]).first()
if opuser is None: return "You don't have permission to view this page!", 401
# check root
tguser = session.query(table).filter(table.id==id).first()
if tguser is None: return "User is not exist", 400
if tguser.user == "root": return "You cannot delete user:root", 400
# delete
session.delete(tguser)
session.commit()
logger.logger(db, "admin", "User:%s deleted an user:%s"%(opuser.user, tguser.user)) # logger
session.close()
return "OK", 200
####################
# Article Area #
####################
####################
# Setting Area #
####################
@admin.route("setting/get", methods=["GET"])
@role_required(["setting.edit"])
def setting_get():
return jsonify(setting_loader.loadset()), 200
@admin.route("setting/edit", methods=["POST"])
@role_required(["setting.edit"])
def setting_edit():
req = request.json
d = None
for r in req:
d = setting_loader.writeset(r, req.get(r))
if d == 0: return "Failed", 401
return jsonify(d), 200

View File

@ -9,10 +9,10 @@ from protobuf_files import niming_pb2
""" """
TODO: TODO:
- admin (看文審核文章刪文[新增用戶刪除用戶](用戶管理)管理平台設定) - admin (看文審核文章刪文[新增用戶刪除用戶](用戶管理)[V]管理平台設定)
- IG post - IG post
- log 的方式之後要重新設計 - log 的方式之後要重新設計 > 正規化
- IP Record (deploy之前配合rev proxy) - IP Record (deploy之前配合rev proxy)
- 檔案完成但是再看看要不要讓發文者持sha256存取自己發的文的檔案 - 檔案完成但是再看看要不要讓發文者持sha256存取自己發的文的檔案
""" """
@ -75,8 +75,8 @@ def getarticle(id:int):
# 上傳文章 / 留言 # 上傳文章 / 留言
@article.route("/post", methods = ["POST"]) @article.route("/post", methods = ["POST"])
def posting(): def posting():
# flow:
# ctx -> hash -> reference -> file -> IP -> IG -> mark -> post | -> log # ctx -> hash -> reference -> file -> IP -> IG -> mark -> post | -> log
# db # db
db = current_app.shared_resource.engine db = current_app.shared_resource.engine
Session = sessionmaker(bind=db) Session = sessionmaker(bind=db)
@ -164,7 +164,7 @@ def posting():
# logger # logger
logger.logger(db, "newpost", "New post (id=%d point to %s): %s"%(res[0]["id"], ref, mark)) logger.logger(db, "newpost", "New post (id=%d point to %s): %s"%(res[0]["id"], ref, mark))
return jsonify(res), 200 return jsonify(res), 201
# 只有發文者可以看到的獲取指定文章 # 只有發文者可以看到的獲取指定文章
# 只有發文者可以做到的刪除文章 # 只有發文者可以做到的刪除文章

View File

@ -29,7 +29,7 @@ def listlog():
# 指定顯示特定一條log # 指定顯示特定一條log
@log.route("/get/<int:id>", methods = ["GET"]) @log.route("/get/<int:id>", methods = ["GET"])
def getlog(id): def getlog(id:int):
# db # db
db = current_app.shared_resource.engine db = current_app.shared_resource.engine
Session = sessionmaker(bind=db) Session = sessionmaker(bind=db)

View File

@ -1 +1 @@
{"Check_Before_Post":false, "Niming_Max_Word":500, "Attachment_Count":5, "Attachment_Size":209715200, "Allowed_MIME":["image/jpeg", "image/pjpeg", "image/png", "image/heic", "image/heif", "video/mp4", "video/quicktime", "video/hevc", "image/gif", "image/webp"]} {"Check_Before_Post": false, "JWT_Valid_Time": 604800, "Niming_Max_Word": 500, "Attachment_Count": 5, "Attachment_Size": 209715200, "Allowed_MIME": ["image/jpeg", "image/pjpeg", "image/png", "image/heic", "image/heif", "video/mp4", "video/quicktime", "video/hevc", "image/gif", "image/webp"]}

View File

@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, TIMESTAMP, func, BIGINT, LargeBinary from sqlalchemy import Column, Integer, String, TIMESTAMP, func, BIGINT, LargeBinary, ARRAY
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base() Base = declarative_base()
@ -40,3 +40,14 @@ class SQLfile(Base):
def __repr__(self): def __repr__(self):
return f"<file(id={self.id}, created_at={self.created_at}, type={self.type}, binary={self.binary}, reference={self.reference})>" return f"<file(id={self.id}, created_at={self.created_at}, type={self.type}, binary={self.binary}, reference={self.reference})>"
class SQLuser(Base):
__tablename__ = 'users'
id = Column(BIGINT, primary_key=True)
user = Column(String)
password = Column(String) # hash , sha512
permission = Column(ARRAY(String))
def __repr__(self):
return f"<user(id={self.id}, user={self.user}, password={self.password}, permission={self.permission})>"

13
utils/platform_consts.py Normal file
View File

@ -0,0 +1,13 @@
# Permission List
pList = ["article.read", "article.pend", "article.del", "setting.edit"] # no permission:usermgr except root
pList_root = pList + ["usermgr"]
# Platform Setting Model
platform_setting_model = {
"Check_Before_Post": [bool],
"JWT_Valid_Time": [int],
"Niming_Max_Word": [int],
"Attachment_Count": [int],
"Attachment_Size": [int],
"Allowed_MIME": [list, str],
}

View File

@ -1,6 +1,35 @@
import json import json
from utils.platform_consts import platform_setting_model
def loadset(): def loadset():
with open("./settings.json", "r", encoding = "utf-8") as f: with open("./settings.json", "r", encoding = "utf-8") as f:
d = json.load(f) d = json.load(f)
return d return d
def typechecker(name:str, value):
if not(name in platform_setting_model.keys()) or not(isinstance(value, platform_setting_model.get(name)[0])):
return 0
iterable = False
try:
iter(platform_setting_model.get(name)[0])
iterable = True
except:
iterable = False
if iterable:
for v in value:
if not(isinstance(v, platform_setting_model.get(name)[1])): return 0
return 1
def writeset(name:str, value):
if not typechecker(name, value): return 0
d:dict = loadset()
d[name] = value
with open("./settings.json", "w", encoding = "utf-8") as f:
json.dump(d, f, ensure_ascii=False)
return d