From 0cb5337e32ca593a9c4c09fbe42584943ae2dc73 Mon Sep 17 00:00:00 2001 From: Benjamin Collet <benjamin@collet.eu> Date: Sat, 10 May 2025 20:49:17 +0200 Subject: [PATCH] Add support for filtering parameters --- step-ca-inspector/main.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/step-ca-inspector/main.py b/step-ca-inspector/main.py index c33ddc4..6320f0e 100644 --- a/step-ca-inspector/main.py +++ b/step-ca-inspector/main.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Query from fastapi_utils.tasks import repeat_every from prometheus_client import make_asgi_app, Gauge from models import x509_cert, ssh_cert @@ -6,6 +6,7 @@ from config import config from pydantic import BaseModel from typing import List, Union from datetime import datetime +from enum import Enum import mariadb import sys @@ -187,7 +188,14 @@ def get_x509_cert(serial: str) -> Union[x509Cert, None]: @app.get("/ssh/certs", tags=["ssh"]) def list_ssh_certs( - sort_key: str = "not_after", revoked: bool = False, expired: bool = False + sort_key: str = Query(enum=["not_after", "not_before"], default="not_after"), + revoked: bool = False, + expired: bool = False, + cert_type: list[Enum("Types", [("Host", "Host"), ("User", "User")])] = Query( + ["Host", "User"] + ), + key: str = None, + principal: str = None, ) -> list[sshCert]: certs = ssh_cert.list(db, sort_key=sort_key) cert_list = [] @@ -197,6 +205,12 @@ def list_ssh_certs( continue if cert.status.value == ssh_cert.status.REVOKED and not revoked: continue + if cert.type not in [item.value for item in cert_type]: + continue + if key is not None and key not in str(cert.key_id): + continue + if principal is not None and principal not in str(cert.principals): + continue cert.status = str(cert.status) cert_list.append(cert)