185 lines
5 KiB
Python
185 lines
5 KiB
Python
from flask import Flask, make_response, redirect, render_template, request, g, send_file
|
|
import subprocess
|
|
from uuid import uuid4
|
|
from base64 import b32encode, b64encode
|
|
from pathlib import Path
|
|
import sqlite3
|
|
|
|
app = Flask(__name__)
|
|
CERT_LOCATION = Path("./certs/")
|
|
|
|
CA_PEM = Path("./certs/rootCA.pem")
|
|
CA_KEY = Path("./certs/rootCA-key.pem")
|
|
|
|
|
|
def save_csr(text: str):
|
|
req_path = CERT_LOCATION / "req"
|
|
if not req_path.exists() or not req_path.is_dir():
|
|
req_path.mkdir()
|
|
fname_id = b32encode(uuid4().bytes_le).decode()
|
|
fname = f"{fname_id}.csr"
|
|
with open(req_path / fname, "w") as f:
|
|
f.write(text)
|
|
return fname_id
|
|
|
|
|
|
def get_db():
|
|
db = getattr(g, "_database", None)
|
|
if db is None:
|
|
db = g._database = sqlite3.connect("cert.db")
|
|
return db
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, "_database", None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
@app.post("/reqcert")
|
|
def reqcert():
|
|
csr = request.form["csr"]
|
|
name = request.form["name"]
|
|
openssl_check = subprocess.Popen(
|
|
["openssl", "req", "-verify"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
openssl_check.communicate(csr.encode())
|
|
if openssl_check.returncode:
|
|
return make_response(
|
|
"<script>alert('Not Work');window.location.href='/'</script>"
|
|
)
|
|
|
|
fname = save_csr(csr)
|
|
cur = get_db().cursor()
|
|
id_tk = request.cookies.get("id_tk")
|
|
cur.execute(
|
|
"INSERT INTO request (id, name, csr) VALUES (?,?,?)", [id_tk, name, fname]
|
|
)
|
|
get_db().commit()
|
|
cur.close()
|
|
|
|
return redirect("/")
|
|
|
|
|
|
@app.get("/download_pem/<filename>")
|
|
def download_der(filename: str):
|
|
uid = request.cookies.get("id_tk")
|
|
if not uid:
|
|
return "<script>alert('Not Supporting user token in cookies');window.location.href='/'</script>"
|
|
cur = get_db().cursor()
|
|
record = cur.execute(
|
|
"SELECT name FROM issued WHERE id=? AND filename=?", [uid, filename]
|
|
).fetchall()
|
|
if len(record) != 1:
|
|
return "<script>alert('DB has no record of this file');window.location.href='/'</script>"
|
|
|
|
cert_dir_path = Path("./certs/issue")
|
|
if not (cert_dir_path / (filename + ".pem")).exists():
|
|
return "<script>alert('Failed to get the certificate');window.location.href='/'</script>"
|
|
|
|
return send_file(
|
|
cert_dir_path / (filename + ".pem"), download_name=f"{record[0][0]}.pem"
|
|
)
|
|
|
|
|
|
@app.get("/")
|
|
def root():
|
|
id_tk = request.cookies.get("id_tk")
|
|
if id_tk:
|
|
cursor = get_db().cursor()
|
|
issued = cursor.execute(
|
|
"SELECT name, filename FROM issued WHERE id=?",
|
|
[id_tk],
|
|
).fetchall()
|
|
pending = cursor.execute(
|
|
"SELECT name FROM request WHERE id=?", [id_tk]
|
|
).fetchall()
|
|
cursor.close()
|
|
|
|
return render_template(
|
|
"index.html",
|
|
issued=issued,
|
|
pending=pending,
|
|
)
|
|
resp = make_response(render_template("index.html", issued=[], pending=[]))
|
|
resp.set_cookie("id_tk", b64encode(uuid4().bytes).decode())
|
|
return resp
|
|
|
|
|
|
def init_db():
|
|
with app.app_context():
|
|
db = get_db()
|
|
try:
|
|
with app.open_resource("schema.sql", mode="r") as f:
|
|
db.cursor().executescript(f.read()).close()
|
|
db.commit()
|
|
except sqlite3.OperationalError as e:
|
|
print(e)
|
|
|
|
|
|
@app.get("/admin")
|
|
def admin_index():
|
|
cursor = get_db().cursor()
|
|
certreqs = cursor.execute("SELECT name,csr FROM request").fetchall()
|
|
cursor.close()
|
|
print(certreqs)
|
|
return render_template("admin.html", reqs=certreqs)
|
|
|
|
|
|
@app.get("/admin/issue")
|
|
def issue():
|
|
id = request.args.get("id")
|
|
csr_dir_path = Path("./certs/req/")
|
|
cert_dir_path = Path("./certs/issue")
|
|
if not cert_dir_path.exists():
|
|
cert_dir_path.mkdir()
|
|
if not id or not (csr_dir_path / (id + ".csr")).is_file():
|
|
return "Failed"
|
|
|
|
openssl_call = subprocess.run(
|
|
[
|
|
"openssl",
|
|
"x509",
|
|
"-in",
|
|
(csr_dir_path / (id + ".csr")).as_posix(),
|
|
"-req",
|
|
"-extfile",
|
|
"./certs/issue.cfg",
|
|
"-extensions",
|
|
"usr_cert",
|
|
"-copy_extensions",
|
|
"copy",
|
|
"-CA",
|
|
CA_PEM.as_posix(),
|
|
"-CAkey",
|
|
CA_KEY.as_posix(),
|
|
"-outform",
|
|
"pem",
|
|
"-out",
|
|
(cert_dir_path / (id + ".pem")),
|
|
],
|
|
)
|
|
if openssl_call.returncode == 0:
|
|
cursor = get_db().cursor()
|
|
rec = cursor.execute(
|
|
"DELETE FROM request WHERE csr=? RETURNING id,name,csr", [id]
|
|
).fetchone()
|
|
cursor.execute("INSERT INTO issued (id,name,filename) VALUES (?,?,?)", rec)
|
|
get_db().commit()
|
|
cursor.close()
|
|
|
|
return redirect("/admin")
|
|
return "<script>alert('Failed to Issue');window.location.href='/admin'</script>"
|
|
|
|
|
|
def create_app():
|
|
init_db()
|
|
return app
|
|
|
|
|
|
if __name__ == "__main__":
|
|
create_app().run()
|