step-ca-inspector/models/x509_cert.py

145 lines
4.4 KiB
Python

import binascii
import dateutil
import json
import mariadb
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from datetime import datetime, timedelta, timezone
from models.config import config
config()
conn = mariadb.connect(
host=config.database_host,
user=config.database_user,
password=config.database_password,
database=config.database_name,
)
class list:
certs = []
def __new__(cls, sort_key=None):
cur = conn.cursor()
cur.execute(
"""SELECT x509_certs.nvalue AS cert,
x509_certs_data.nvalue AS data,
revoked_x509_certs.nvalue AS revoked
FROM x509_certs
INNER JOIN x509_certs_data USING(nkey)
LEFT JOIN revoked_x509_certs USING(nkey)"""
)
for result in cur:
cert_object = cert(result)
cls.certs.append(cert_object)
cur.close()
if sort_key is not None:
cls.certs.sort(key=lambda item: getattr(item, sort_key))
return cls.certs
class cert:
def __init__(self, cert):
(cert_der, cert_data_raw, cert_revoked_raw) = cert
cert_data = json.loads(cert_data_raw)
if cert_revoked_raw is not None:
cert_revoked = json.loads(cert_revoked_raw)
else:
cert_revoked = None
self.load(cert_der, cert_data, cert_revoked)
@classmethod
def from_serial(cls, serial):
return cls(cert=cls.get_cert(cls, serial))
def load(self, cert_der, cert_data, cert_revoked):
cert = x509.load_der_x509_certificate(cert_der)
self.pem = cert.public_bytes(serialization.Encoding.PEM)
self.serial = str(cert.serial_number)
self.sha256 = binascii.b2a_hex(cert.fingerprint(hashes.SHA256()))
self.sha1 = binascii.b2a_hex(cert.fingerprint(hashes.SHA1()))
self.md5 = binascii.b2a_hex(cert.fingerprint(hashes.MD5()))
self.pub_key = cert.public_key().public_bytes(
serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo
)
self.pub_alg = cert.public_key_algorithm_oid._name
self.sig_alg = cert.signature_algorithm_oid._name
self.issuer = cert.issuer.rfc4514_string()
self.subject = cert.subject.rfc4514_string({x509.NameOID.EMAIL_ADDRESS: "E"})
self.not_before = cert.not_valid_before_utc.replace(microsecond=0)
self.not_after = cert.not_valid_after_utc.replace(microsecond=0)
try:
san_data = cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
self.san_names = san_data.value.get_values_for_type(x509.GeneralName)
except x509.extensions.ExtensionNotFound:
self.san_names = []
self.provisioner = cert_data.get("provisioner", None)
if cert_revoked is not None:
self.revoked_at = dateutil.parser.isoparse(
cert_revoked.get("RevokedAt")
).replace(microsecond=0)
else:
self.revoked_at = None
now_with_tz = datetime.utcnow().replace(
tzinfo=timezone(offset=timedelta()), microsecond=0
)
if self.revoked_at is not None and self.revoked_at < now_with_tz:
self.status = status(status.REVOKED)
elif self.not_after < now_with_tz:
self.status = status(status.EXPIRED)
else:
self.status = status(status.VALID)
def get_cert(self, cert_serial):
cur = conn.cursor()
cur.execute(
"""SELECT x509_certs.nvalue AS cert,
x509_certs_data.nvalue AS data,
revoked_x509_certs.nvalue AS revoked
FROM x509_certs
INNER JOIN x509_certs_data USING(nkey)
LEFT JOIN revoked_x509_certs USING(nkey)
WHERE nkey=?""",
(cert_serial,),
)
if cur.rowcount > 0:
cert = cur.fetchone()
else:
cert = None
cur.close()
return cert
class status:
REVOKED = 1
EXPIRED = 2
VALID = 3
def __init__(self, status):
self.value = status
def __str__(self):
if self.value == self.EXPIRED:
return "Expired"
elif self.value == self.REVOKED:
return "Revoked"
elif self.value == self.VALID:
return "Valid"
else:
return "Undefined"