Delivers the append-only audit trail system for all compliance-critical actions per the TRA-246 acceptance criteria. - tracking/models.py: AuditEvent model with ORM-level immutability guard (save raises on update, delete raises on direct call) - tracking/audit.py: single record() call point; never raises in production - tracking/admin.py: read-only Django admin for AuditEvent inspection - tracking/migrations/0001_initial.py: DB schema with composite indexes - tracking/serializers.py: PII metadata gating (oidc_sub stripped for non-admin callers) - tracking/views.py: read-only AuditEventViewSet (IsPrivileged + 60/min throttle) - tracking/urls.py: registers audit/events/ router - tracking/management/commands/prune_audit_log.py: retention enforcement command with --dry-run and --class filter; writes access.admin_action event on real prune runs - config/settings/base.py: AUDIT_RETENTION_DAYS per event class + audit throttle rate - api/exceptions.py: wires access.permission_denied audit event on every PermissionDenied exception (M1 integration point) - tests/test_audit.py: 26-event taxonomy coverage, immutability, retention, API permission, PII gating, and service helper unit tests Co-Authored-By: Paperclip <noreply@paperclip.ing>
63 lines
2.0 KiB
Python
63 lines
2.0 KiB
Python
import logging
|
|
import uuid
|
|
|
|
from rest_framework.exceptions import PermissionDenied
|
|
from rest_framework.views import exception_handler
|
|
from rest_framework.response import Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _client_ip(request) -> str | None:
|
|
if request is None:
|
|
return None
|
|
forwarded = request.META.get("HTTP_X_FORWARDED_FOR")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
return request.META.get("REMOTE_ADDR")
|
|
|
|
|
|
def custom_exception_handler(exc, context):
|
|
response = exception_handler(exc, context)
|
|
if response is None:
|
|
return None
|
|
|
|
request = context.get("request")
|
|
request_id = (
|
|
getattr(request, "META", {}).get("HTTP_X_REQUEST_ID") or str(uuid.uuid4())
|
|
)
|
|
|
|
code = getattr(exc, "default_code", "error")
|
|
message = str(exc.detail) if hasattr(exc, "detail") else str(exc)
|
|
details = exc.detail if isinstance(getattr(exc, "detail", None), dict) else {}
|
|
|
|
# Record access.permission_denied audit event for all 403s (M1 integration).
|
|
if isinstance(exc, PermissionDenied) and request is not None:
|
|
try:
|
|
from tracking.audit import record as audit_record
|
|
view = context.get("view")
|
|
resource_type = type(view).__name__ if view else None
|
|
audit_record(
|
|
"access.permission_denied",
|
|
actor=getattr(request, "user", None),
|
|
actor_ip=_client_ip(request),
|
|
object_type=resource_type,
|
|
metadata={
|
|
"path": request.path,
|
|
"method": request.method,
|
|
"request_id": request_id,
|
|
},
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to write access.permission_denied audit event")
|
|
|
|
response.data = {
|
|
"error": {
|
|
"code": code,
|
|
"message": message,
|
|
"details": details,
|
|
"request_id": request_id,
|
|
}
|
|
}
|
|
return response
|