from flask import Flask, make_response, redirect, render_template, request, g, send_file
import subprocess
from uuid import uuid4
from base64 import b32encode, b64encode
from pathlib import Path
import sqlite3
app = Flask(__name__)
CERT_LOCATION = Path("./certs/")
CA_PEM = Path("./certs/rootCA.pem")
CA_KEY = Path("./certs/rootCA-key.pem")
def save_csr(text: str):
req_path = CERT_LOCATION / "req"
if not req_path.exists() or not req_path.is_dir():
req_path.mkdir()
fname_id = b32encode(uuid4().bytes_le).decode()
fname = f"{fname_id}.csr"
with open(req_path / fname, "w") as f:
f.write(text)
return fname_id
def get_db():
db = getattr(g, "_database", None)
if db is None:
db = g._database = sqlite3.connect("cert.db")
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, "_database", None)
if db is not None:
db.close()
@app.post("/reqcert")
def reqcert():
csr = request.form["csr"]
name = request.form["name"]
openssl_check = subprocess.Popen(
["openssl", "req", "-verify"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
openssl_check.communicate(csr.encode())
if openssl_check.returncode:
return make_response(
""
)
fname = save_csr(csr)
cur = get_db().cursor()
id_tk = request.cookies.get("id_tk")
cur.execute(
"INSERT INTO request (id, name, csr) VALUES (?,?,?)", [id_tk, name, fname]
)
get_db().commit()
cur.close()
return redirect("/")
@app.get("/download_pem/")
def download_der(filename: str):
uid = request.cookies.get("id_tk")
if not uid:
return ""
cur = get_db().cursor()
record = cur.execute(
"SELECT name FROM issued WHERE id=? AND filename=?", [uid, filename]
).fetchall()
if len(record) != 1:
return ""
cert_dir_path = Path("./certs/issue")
if not (cert_dir_path / (filename + ".pem")).exists():
return ""
return send_file(
cert_dir_path / (filename + ".pem"), download_name=f"{record[0][0]}.pem"
)
@app.get("/")
def root():
id_tk = request.cookies.get("id_tk")
if id_tk:
cursor = get_db().cursor()
issued = cursor.execute(
"SELECT name, filename FROM issued WHERE id=?",
[id_tk],
).fetchall()
pending = cursor.execute(
"SELECT name FROM request WHERE id=?", [id_tk]
).fetchall()
cursor.close()
return render_template(
"index.html",
issued=issued,
pending=pending,
)
resp = make_response(render_template("index.html", issued=[], pending=[]))
resp.set_cookie("id_tk", b64encode(uuid4().bytes).decode())
return resp
def init_db():
with app.app_context():
db = get_db()
try:
with app.open_resource("schema.sql", mode="r") as f:
db.cursor().executescript(f.read()).close()
db.commit()
except sqlite3.OperationalError as e:
print(e)
@app.get("/admin")
def admin_index():
cursor = get_db().cursor()
certreqs = cursor.execute("SELECT name,csr FROM request").fetchall()
cursor.close()
print(certreqs)
return render_template("admin.html", reqs=certreqs)
@app.get("/admin/issue")
def issue():
id = request.args.get("id")
csr_dir_path = Path("./certs/req/")
cert_dir_path = Path("./certs/issue")
if not cert_dir_path.exists():
cert_dir_path.mkdir()
if not id or not (csr_dir_path / (id + ".csr")).is_file():
return "Failed"
openssl_call = subprocess.run(
[
"openssl",
"x509",
"-in",
(csr_dir_path / (id + ".csr")).as_posix(),
"-req",
"-extfile",
"./certs/issue.cfg",
"-extensions",
"usr_cert",
"-copy_extensions",
"copy",
"-CA",
CA_PEM.as_posix(),
"-CAkey",
CA_KEY.as_posix(),
"-outform",
"pem",
"-out",
(cert_dir_path / (id + ".pem")),
],
)
if openssl_call.returncode == 0:
cursor = get_db().cursor()
rec = cursor.execute(
"DELETE FROM request WHERE csr=? RETURNING id,name,csr", [id]
).fetchone()
cursor.execute("INSERT INTO issued (id,name,filename) VALUES (?,?,?)", rec)
get_db().commit()
cursor.close()
return redirect("/admin")
return ""
def create_app():
init_db()
return app
if __name__ == "__main__":
create_app().run()