Add support for filtering parameters

This commit is contained in:
Benjamin Collet 2025-05-10 20:49:17 +02:00
parent fdb4926260
commit 0cb5337e32
Signed by: bcollet
SSH key fingerprint: SHA256:8UJspOIcCOS+MtSOcnuq2HjKFube4ox1s/+A62ixov4

View file

@ -1,4 +1,4 @@
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException, Query
from fastapi_utils.tasks import repeat_every from fastapi_utils.tasks import repeat_every
from prometheus_client import make_asgi_app, Gauge from prometheus_client import make_asgi_app, Gauge
from models import x509_cert, ssh_cert from models import x509_cert, ssh_cert
@ -6,6 +6,7 @@ from config import config
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Union from typing import List, Union
from datetime import datetime from datetime import datetime
from enum import Enum
import mariadb import mariadb
import sys import sys
@ -187,7 +188,14 @@ def get_x509_cert(serial: str) -> Union[x509Cert, None]:
@app.get("/ssh/certs", tags=["ssh"]) @app.get("/ssh/certs", tags=["ssh"])
def list_ssh_certs( def list_ssh_certs(
sort_key: str = "not_after", revoked: bool = False, expired: bool = False sort_key: str = Query(enum=["not_after", "not_before"], default="not_after"),
revoked: bool = False,
expired: bool = False,
cert_type: list[Enum("Types", [("Host", "Host"), ("User", "User")])] = Query(
["Host", "User"]
),
key: str = None,
principal: str = None,
) -> list[sshCert]: ) -> list[sshCert]:
certs = ssh_cert.list(db, sort_key=sort_key) certs = ssh_cert.list(db, sort_key=sort_key)
cert_list = [] cert_list = []
@ -197,6 +205,12 @@ def list_ssh_certs(
continue continue
if cert.status.value == ssh_cert.status.REVOKED and not revoked: if cert.status.value == ssh_cert.status.REVOKED and not revoked:
continue continue
if cert.type not in [item.value for item in cert_type]:
continue
if key is not None and key not in str(cert.key_id):
continue
if principal is not None and principal not in str(cert.principals):
continue
cert.status = str(cert.status) cert.status = str(cert.status)
cert_list.append(cert) cert_list.append(cert)