from flask import Flask, make_response, redirect, render_template, request, g, send_file import subprocess from uuid import uuid4 from base64 import b32encode, b64encode from pathlib import Path import sqlite3 app = Flask(__name__) CERT_LOCATION = Path("./certs/") CA_PEM = Path("./certs/rootCA.pem") CA_KEY = Path("./certs/rootCA-key.pem") def save_csr(text: str): req_path = CERT_LOCATION / "req" if not req_path.exists() or not req_path.is_dir(): req_path.mkdir() fname_id = b32encode(uuid4().bytes_le).decode() fname = f"{fname_id}.csr" with open(req_path / fname, "w") as f: f.write(text) return fname_id def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect("cert.db") return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, "_database", None) if db is not None: db.close() @app.post("/reqcert") def reqcert(): csr = request.form["csr"] name = request.form["name"] openssl_check = subprocess.Popen( ["openssl", "req", "-verify"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) openssl_check.communicate(csr.encode()) if openssl_check.returncode: return make_response( "" ) fname = save_csr(csr) cur = get_db().cursor() id_tk = request.cookies.get("id_tk") cur.execute( "INSERT INTO request (id, name, csr) VALUES (?,?,?)", [id_tk, name, fname] ) get_db().commit() cur.close() return redirect("/") @app.get("/download_pem/") def download_der(filename: str): uid = request.cookies.get("id_tk") if not uid: return "" cur = get_db().cursor() record = cur.execute( "SELECT name FROM issued WHERE id=? AND filename=?", [uid, filename] ).fetchall() if len(record) != 1: return "" cert_dir_path = Path("./certs/issue") if not (cert_dir_path / (filename + ".pem")).exists(): return "" return send_file( cert_dir_path / (filename + ".pem"), download_name=f"{record[0][0]}.pem" ) @app.get("/") def root(): id_tk = request.cookies.get("id_tk") if id_tk: cursor = get_db().cursor() issued = cursor.execute( "SELECT name, filename FROM issued WHERE id=?", [id_tk], ).fetchall() pending = cursor.execute( "SELECT name FROM request WHERE id=?", [id_tk] ).fetchall() cursor.close() return render_template( "index.html", issued=issued, pending=pending, ) resp = make_response(render_template("index.html", issued=[], pending=[])) resp.set_cookie("id_tk", b64encode(uuid4().bytes).decode()) return resp def init_db(): with app.app_context(): db = get_db() try: with app.open_resource("schema.sql", mode="r") as f: db.cursor().executescript(f.read()).close() db.commit() except sqlite3.OperationalError as e: print(e) @app.get("/admin") def admin_index(): cursor = get_db().cursor() certreqs = cursor.execute("SELECT name,csr FROM request").fetchall() cursor.close() print(certreqs) return render_template("admin.html", reqs=certreqs) @app.get("/admin/issue") def issue(): id = request.args.get("id") csr_dir_path = Path("./certs/req/") cert_dir_path = Path("./certs/issue") if not cert_dir_path.exists(): cert_dir_path.mkdir() if not id or not (csr_dir_path / (id + ".csr")).is_file(): return "Failed" openssl_call = subprocess.run( [ "openssl", "x509", "-in", (csr_dir_path / (id + ".csr")).as_posix(), "-req", "-extfile", "./certs/issue.cfg", "-extensions", "usr_cert", "-copy_extensions", "copy", "-CA", CA_PEM.as_posix(), "-CAkey", CA_KEY.as_posix(), "-outform", "pem", "-out", (cert_dir_path / (id + ".pem")), ], ) if openssl_call.returncode == 0: cursor = get_db().cursor() rec = cursor.execute( "DELETE FROM request WHERE csr=? RETURNING id,name,csr", [id] ).fetchone() cursor.execute("INSERT INTO issued (id,name,filename) VALUES (?,?,?)", rec) get_db().commit() cursor.close() return redirect("/admin") return "" def create_app(): init_db() return app if __name__ == "__main__": create_app().run()