Files
training-software/api/exceptions.py
Paperclip CTO 686acf259a feat(TRA-246): implement audit logging and compliance controls (M5)
Delivers the append-only audit trail system for all compliance-critical
actions per the TRA-246 acceptance criteria.

- tracking/models.py: AuditEvent model with ORM-level immutability guard
  (save raises on update, delete raises on direct call)
- tracking/audit.py: single record() call point; never raises in production
- tracking/admin.py: read-only Django admin for AuditEvent inspection
- tracking/migrations/0001_initial.py: DB schema with composite indexes
- tracking/serializers.py: PII metadata gating (oidc_sub stripped for
  non-admin callers)
- tracking/views.py: read-only AuditEventViewSet (IsPrivileged + 60/min
  throttle)
- tracking/urls.py: registers audit/events/ router
- tracking/management/commands/prune_audit_log.py: retention enforcement
  command with --dry-run and --class filter; writes access.admin_action
  event on real prune runs
- config/settings/base.py: AUDIT_RETENTION_DAYS per event class + audit
  throttle rate
- api/exceptions.py: wires access.permission_denied audit event on every
  PermissionDenied exception (M1 integration point)
- tests/test_audit.py: 26-event taxonomy coverage, immutability, retention,
  API permission, PII gating, and service helper unit tests

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-05-07 09:11:23 +02:00

63 lines
2.0 KiB
Python

import logging
import uuid
from rest_framework.exceptions import PermissionDenied
from rest_framework.views import exception_handler
from rest_framework.response import Response
logger = logging.getLogger(__name__)
def _client_ip(request) -> str | None:
if request is None:
return None
forwarded = request.META.get("HTTP_X_FORWARDED_FOR")
if forwarded:
return forwarded.split(",")[0].strip()
return request.META.get("REMOTE_ADDR")
def custom_exception_handler(exc, context):
response = exception_handler(exc, context)
if response is None:
return None
request = context.get("request")
request_id = (
getattr(request, "META", {}).get("HTTP_X_REQUEST_ID") or str(uuid.uuid4())
)
code = getattr(exc, "default_code", "error")
message = str(exc.detail) if hasattr(exc, "detail") else str(exc)
details = exc.detail if isinstance(getattr(exc, "detail", None), dict) else {}
# Record access.permission_denied audit event for all 403s (M1 integration).
if isinstance(exc, PermissionDenied) and request is not None:
try:
from tracking.audit import record as audit_record
view = context.get("view")
resource_type = type(view).__name__ if view else None
audit_record(
"access.permission_denied",
actor=getattr(request, "user", None),
actor_ip=_client_ip(request),
object_type=resource_type,
metadata={
"path": request.path,
"method": request.method,
"request_id": request_id,
},
)
except Exception:
logger.exception("Failed to write access.permission_denied audit event")
response.data = {
"error": {
"code": code,
"message": message,
"details": details,
"request_id": request_id,
}
}
return response