Log Schema¶
Note
Quick Summary: Structured JSON logging schema for all CSA-in-a-Box Python services via structlog — baseline fields (service, timestamp, level, event, trace_id, correlation_id), per-trigger binding conventions (HTTP, Blob, Event Hub, Timer, CLI), and KQL queries for Log Analytics parsing.
All Python services in CSA-in-a-Box emit structured JSON log lines via governance.common.logging (which wraps structlog). Each line is a single-line JSON object so that Azure Log Analytics can parse it with a single KQL expression regardless of which service emitted it.
📑 Table of Contents¶
- 📋 1. Baseline Fields
- ⚙️ 2. Services and Their Canonical Events
- 🔗 3. Trigger Bindings
- 📊 4. Log Analytics Parsing
- 💻 5. Local and Console Output
📋 1. Baseline Fields¶
Every log line contains these fields, always at the top level:
| Field | Type | Example | Notes |
|---|---|---|---|
service | string | "csa-ai-enrichment" | Set at process start via configure_structlog(service=...). The source-of-truth mapping from service name to logical component is this table — add a row whenever a new service starts emitting logs. |
timestamp | string (ISO-8601, UTC) | "2026-04-10T15:23:47.123456Z" | Always UTC. Uses structlog's TimeStamper(fmt="iso", utc=True). |
level | string | "info" | Lowercase: debug, info, warning, error, critical. |
event | string | "request.received" | The message id. Follow dotted-namespaced verbs (request.received, enrichment.text_failed, batch.completed). |
trace_id | string (32-hex) | "0af7651916cd43dd8448eb211c80319c" | W3C trace id. Added automatically inside bind_trace_context(...). Same value across all log lines emitted for the same request / batch. |
correlation_id | string (UUID4) | "8e1a3c74-…" | Correlation id bound for the current unit of work. Same value across all log lines for the run, but may differ from trace_id when the caller supplies a business-level correlation id. |
Any additional key/value pairs come from the caller (via logger.info("event", foo="bar")) or from fields bound at trigger entry (bind_trace_context(request_path=..., batch_size=...)).
⚙️ 2. Services and Their Canonical Events¶
| Service | Emitting module | Canonical events |
|---|---|---|
csa-data-quality | csa_platform/csa_platform/governance/dataquality/run_quality_checks.py | data_quality.run_started, data_quality.run_completed, dbt.test_failed, volume.check_result, freshness.result, report.emitted |
csa-ai-enrichment | csa_platform/functions/aiEnrichment/functions/function_app.py | request.received, request.invalid_json, request.missing_field, request.payload_too_large, request.completed, blob.received, blob.unsupported_type, blob.completed, enrichment.text_failed, enrichment.document_failed, ai_client.import_failed |
csa-event-processing | csa_platform/functions/eventProcessing/functions/function_app.py | batch.received, batch.completed, event.invalid_json, event.processing_failed, replay.request_received, replay.invalid_json, replay.empty_payload, replay.completed, heartbeat, timer.past_due |
Important
When a service adds a new event, add it here so operators have a single index of what can appear in the log stream.
🔗 3. Trigger Bindings¶
bind_trace_context(...) is the only sanctioned way to attach per-request fields. The conventions per trigger type:
HTTP triggers¶
trace_id = extract_trace_id_from_headers(dict(req.headers))
with bind_trace_context(
trace_id=trace_id, # None -> a new one is generated
request_method="POST",
request_route="/api/enrich",
):
logger.info("request.received")
...
Logs will carry: trace_id, correlation_id, request_method, request_route.
Blob triggers¶
with bind_trace_context(
trigger="blob",
blob_name=blob.name,
blob_size=blob.length,
):
logger.info("blob.received")
Event Hub (batch) triggers¶
with bind_trace_context(
trigger="eventhub",
batch_size=len(events),
first_sequence_number=events[0].sequence_number if events else None,
):
logger.info("batch.received")
Timer triggers¶
CLI entry points¶
Bind a run-scoped correlation_id at the top of main() so every log line from a single invocation shares the same id:
from structlog.contextvars import bind_contextvars
bind_contextvars(correlation_id=new_correlation_id(), suite=args.suite)
📊 4. Log Analytics Parsing¶
Azure Functions ingests stdout into Application Insights traces, which Log Analytics exposes as the AppTraces table. Because we emit JSON, a single KQL expression unpacks the whole schema:
AppTraces
| where TimeGenerated > ago(24h)
| extend payload = parse_json(Message)
| extend
service = tostring(payload.service),
event_name = tostring(payload.event),
trace_id = tostring(payload.trace_id),
correlation_id = tostring(payload.correlation_id),
level = tostring(payload.level),
logger = tostring(payload.logger)
| project TimeGenerated, service, level, event_name, trace_id, correlation_id, payload
| order by TimeGenerated desc
Follow a single request end-to-end¶
AppTraces
| extend payload = parse_json(Message)
| where tostring(payload.trace_id) == "<trace-id-from-bad-request>"
| project TimeGenerated, service = tostring(payload.service), event_name = tostring(payload.event), payload
| order by TimeGenerated asc
Top error events per service in the last hour¶
AppTraces
| where TimeGenerated > ago(1h)
| extend payload = parse_json(Message)
| where tostring(payload.level) in ("error", "critical")
| summarize count() by service = tostring(payload.service), event_name = tostring(payload.event)
| order by count_ desc
Batch throughput for the event processor¶
AppTraces
| where TimeGenerated > ago(24h)
| extend payload = parse_json(Message)
| where tostring(payload.service) == "csa-event-processing"
| where tostring(payload.event) == "batch.completed"
| extend processed = toint(payload.processed), errors = toint(payload.errors)
| summarize total_events = sum(processed), total_errors = sum(errors) by bin(TimeGenerated, 5m)
💻 5. Local and Console Output¶
For local development set LOG_FORMAT=console to switch from JSON to the human-readable console renderer:
export LOG_FORMAT=console
python csa_platform/governance/dataquality/run_quality_checks.py --suite bronze
All other behaviour (trace context binding, service tags) is identical; only the serialisation changes.
See also:
- ← Previous: Documentation home
- → Next: Environment Protection
- ⌂ Index: Documentation home