Files
training-software/api/middleware.py
Paperclip CTO 3d541d818a feat(TRA-249): M5 observability, SLOs, backup, and release readiness
- Add prometheus-client to base requirements; sentry-sdk to prod
- api/metrics.py: define HTTP latency histogram, request/error counters, in-flight gauge
- api/middleware.py: extend SecurityAuditMiddleware to observe all four Prometheus collectors per request; low-cardinality path_template label via URL resolver
- api/views.py: /metrics/ endpoint (gated by METRICS_ENABLED setting)
- api/urls.py: wire /metrics/ route
- config/settings/prod.py: METRICS_ENABLED flag; optional Sentry SDK init via SENTRY_DSN env var
- ops/prometheus/alerts.yml: Prometheus alert rules for p95 latency SLO (≤500 ms), error rate SLO (<1%), availability, and saturation
- ops/prometheus/prometheus.yml: scrape config for app + blackbox healthcheck probe
- ops/scripts/backup.sh: pg_dump → S3 STANDARD_IA with retention metadata
- ops/scripts/restore.sh: pg_restore from S3 or local file with interactive confirmation guard
- ops/scripts/synthetic-check.sh: post-deploy smoke test (healthz, metrics gate, schema, 404 shape)
- docs/TRA-249-observability-slos.md: SLO table, PromQL reference queries, alert routing
- docs/TRA-249-backup-restore.md: RPO/RTO targets, drill procedure, restore validation steps
- docs/TRA-249-release-checklist.md: pre/post-deploy checklist
- docs/TRA-249-rollback-runbook.md: decision matrix, app rollback, migration revert, DB restore path

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-05-07 09:14:18 +02:00

113 lines
3.6 KiB
Python

import logging
import time
import uuid
from .metrics import (
HTTP_REQUEST_DURATION_SECONDS,
HTTP_REQUESTS_TOTAL,
HTTP_SERVER_ERRORS_TOTAL,
HTTP_REQUESTS_IN_FLIGHT,
)
access_logger = logging.getLogger(__name__)
security_logger = logging.getLogger("security")
# Paths where any 4xx triggers a security-level log entry
_SENSITIVE_PATH_PREFIXES = (
"/api/v1/auth/",
"/api/v1/token/",
"/api/v1/upload/",
"/api/v1/password/",
)
# Status codes always logged as security events regardless of path
_SECURITY_STATUSES = {401, 403, 429}
class SecurityAuditMiddleware:
"""Attaches a request-scoped ID, emits access logs, and flags security events."""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
request.request_id = request.META.get("HTTP_X_REQUEST_ID") or str(uuid.uuid4())
start = time.monotonic()
HTTP_REQUESTS_IN_FLIGHT.inc()
try:
response = self.get_response(request)
finally:
HTTP_REQUESTS_IN_FLIGHT.dec()
duration_s = time.monotonic() - start
duration_ms = round(duration_s * 1000)
# Normalise path: strip pk-style numeric segments so cardinality stays low
path_template = self._path_template(request)
labels = {
"method": request.method,
"path_template": path_template,
"status_code": str(response.status_code),
}
HTTP_REQUEST_DURATION_SECONDS.labels(**labels).observe(duration_s)
HTTP_REQUESTS_TOTAL.labels(**labels).inc()
if response.status_code >= 500:
HTTP_SERVER_ERRORS_TOTAL.labels(**labels).inc()
extra = {
"request_id": request.request_id,
"method": request.method,
"path": request.path,
"status": response.status_code,
"user": getattr(getattr(request, "user", None), "pk", None),
"ip": self._client_ip(request),
"duration_ms": duration_ms,
}
access_logger.info("request", extra=extra)
if self._is_security_event(request, response):
security_logger.warning(
"security_event path=%s method=%s status=%s user=%s ip=%s duration_ms=%s",
request.path,
request.method,
response.status_code,
extra["user"] or "anonymous",
extra["ip"],
duration_ms,
)
return response
@staticmethod
def _path_template(request) -> str:
"""Return a low-cardinality path label.
Uses Django's URL resolver match if available, otherwise strips numeric
path segments so we don't explode label cardinality with per-object paths.
"""
try:
from django.urls import resolve
match = resolve(request.path_info)
# route pattern like "api/v1/courses/<int:pk>/" is already low-cardinality
return "/" + (match.route or request.path_info).lstrip("/")
except Exception:
import re
return re.sub(r"/\d+", "/{id}", request.path_info)
@staticmethod
def _is_security_event(request, response) -> bool:
if response.status_code in _SECURITY_STATUSES:
return True
return response.status_code >= 400 and any(
request.path.startswith(p) for p in _SENSITIVE_PATH_PREFIXES
)
@staticmethod
def _client_ip(request) -> str:
forwarded = request.META.get("HTTP_X_FORWARDED_FOR")
if forwarded:
return forwarded.split(",")[0].strip()
return request.META.get("REMOTE_ADDR", "unknown")