Home > Best Practices > Error Handling & Monitoring
⚠️ Error Handling & Monitoring Best Practices¶
Last Updated: 2026-04-15 | Version: 2.0 Status: ✅ Final | Maintainer: Documentation Team
📖 Overview¶
Robust error handling and monitoring are essential for production-grade Microsoft Fabric deployments. This guide covers pipeline error architecture, structured error logging, classification taxonomy, monitoring with Data Activator, PySpark error handling patterns, and KQL-based error analysis. It draws on patterns proven across casino gaming compliance, federal agency data pipelines, and real-time streaming workloads.
🧭 Table of Contents¶
- 🏗️ Pipeline Error Architecture
- 💾 Production Error Table Design
- 🔧 Stored Procedures
- 🏷️ Error Classification Taxonomy
- 🔁 Pipeline Error Patterns
- 🐍 PySpark Error Handling
- 🔔 Monitoring with Data Activator
- 📢 Alert Escalation Matrix
- 🔍 KQL Queries for Error Analysis
- 📊 Operational Dashboards
- 📓 Runbook Integration
🏗️ Pipeline Error Architecture¶
Design Principles¶
Every pipeline in a production Fabric deployment should follow these error handling principles:
- Single Error Activity Pattern -- One
On Failureactivity per pipeline that captures all errors - Structured Error Logging -- Write every failure to a centralized error table with full context
- Error Propagation -- Nested pipelines propagate errors upward while preserving original context
- Retry with Backoff -- Transient errors are retried with exponential backoff before escalation
- Fail-Fast for Permanent Errors -- Data quality and permission errors skip retries entirely
Single Error Activity Pattern¶
The recommended approach is to attach a single On Failure path from each critical activity to a shared error-logging activity. This avoids duplicated error handling logic and guarantees consistent capture.
flowchart LR
subgraph Pipeline["Pipeline: ingest_usda_crop_data"]
A[Lookup: Get Config] -->|Success| B[Copy: Extract Data]
B -->|Success| C[Notebook: Transform]
C -->|Success| D[Stored Proc: Update Watermark]
A -->|On Failure| E[Stored Proc: Log Error]
B -->|On Failure| E
C -->|On Failure| E
D -->|On Failure| E
E --> F[Web: Send Alert]
F --> G[Set Variable: Pipeline Failed]
end Error Propagation in Nested Pipelines¶
When using Execute Pipeline activities, errors must propagate correctly from child to parent. The child pipeline should log its own error and then re-throw (by not suppressing the failure) so the parent can also react.
flowchart TB
subgraph Parent["Parent: orchestrate_daily_load"]
P1[ForEach: Source Systems] --> P2[Execute Pipeline: Child]
P2 -->|On Failure| P3[Log Error with Child Context]
P3 --> P4[Continue / Abort Decision]
end
subgraph Child["Child: ingest_source_X"]
C1[Copy Activity] -->|On Failure| C2[Log Error]
C2 --> C3[Fail Activity]
end
P2 --> Child Key configuration:
| Setting | Value | Reason |
|---|---|---|
Wait on completion | true | Parent waits for child outcome |
| Child termination | Use Fail activity after logging | Ensures parent sees the failure |
fireAndForget | false | Never use for error-sensitive pipelines |
Retry Patterns with Exponential Backoff¶
Configure retry policies on activities that interact with external systems (Copy, Web, Lookup against remote sources).
{
"retryPolicy": {
"count": 3,
"intervalInSeconds": 30,
"backoffMultiplier": 2,
"maxIntervalInSeconds": 300
}
}
Retry schedule for this configuration:
| Attempt | Wait Before Retry | Cumulative Time |
|---|---|---|
| 1st retry | 30 seconds | 30s |
| 2nd retry | 60 seconds | 1m 30s |
| 3rd retry | 120 seconds | 3m 30s |
Activities that should have retries:
- Copy Activity (source connectivity, throttling)
- Web Activity (API calls to external endpoints)
- Lookup Activity (remote source queries)
- Stored Procedure Activity (transient deadlocks)
Activities that should NOT retry:
- Notebook Activity (handles its own retry logic internally)
- Fail Activity (intentionally terminates the pipeline)
- Set Variable Activity (local, no transient failures)
💾 Production Error Table Design¶
Error Table DDL¶
Create this table in your Fabric Warehouse for centralized error tracking across all pipelines.
CREATE TABLE dbo.pipeline_errors (
error_id BIGINT IDENTITY(1,1) PRIMARY KEY,
error_timestamp DATETIME2 DEFAULT GETUTCDATE(),
pipeline_name NVARCHAR(256),
pipeline_run_id NVARCHAR(256),
activity_name NVARCHAR(256),
activity_type NVARCHAR(128),
error_code NVARCHAR(128),
error_message NVARCHAR(MAX),
error_classification NVARCHAR(64), -- TRANSIENT, PERMANENT, DATA_QUALITY, PERMISSION, TIMEOUT, RESOURCE
severity NVARCHAR(16), -- CRITICAL, HIGH, MEDIUM, LOW
source_system NVARCHAR(256),
affected_table NVARCHAR(256),
affected_record_count INT,
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
is_resolved BIT DEFAULT 0,
resolution_notes NVARCHAR(MAX),
resolved_by NVARCHAR(256),
resolved_timestamp DATETIME2,
alert_sent BIT DEFAULT 0,
alert_channel NVARCHAR(128),
stack_trace NVARCHAR(MAX),
input_parameters NVARCHAR(MAX),
environment NVARCHAR(64), -- DEV, STAGING, PROD
correlation_id NVARCHAR(256),
parent_pipeline_run_id NVARCHAR(256)
);
Supporting Indexes¶
-- Fast lookup by pipeline and time
CREATE NONCLUSTERED INDEX IX_pipeline_errors_timestamp
ON dbo.pipeline_errors (error_timestamp DESC)
INCLUDE (pipeline_name, severity, is_resolved);
-- Unresolved errors dashboard
CREATE NONCLUSTERED INDEX IX_pipeline_errors_unresolved
ON dbo.pipeline_errors (is_resolved, severity)
WHERE is_resolved = 0;
-- Correlation tracking across nested pipelines
CREATE NONCLUSTERED INDEX IX_pipeline_errors_correlation
ON dbo.pipeline_errors (correlation_id)
INCLUDE (pipeline_name, activity_name, error_classification);
-- Error classification analysis
CREATE NONCLUSTERED INDEX IX_pipeline_errors_classification
ON dbo.pipeline_errors (error_classification, error_timestamp DESC);
Error Summary View¶
CREATE VIEW dbo.vw_error_summary AS
SELECT
CAST(error_timestamp AS DATE) AS error_date,
pipeline_name,
error_classification,
severity,
COUNT(*) AS error_count,
SUM(CASE WHEN is_resolved = 1 THEN 1 ELSE 0 END) AS resolved_count,
SUM(CASE WHEN is_resolved = 0 THEN 1 ELSE 0 END) AS unresolved_count,
AVG(DATEDIFF(MINUTE, error_timestamp, ISNULL(resolved_timestamp, GETUTCDATE()))) AS avg_resolution_minutes,
MAX(error_timestamp) AS last_occurrence
FROM dbo.pipeline_errors
GROUP BY
CAST(error_timestamp AS DATE),
pipeline_name,
error_classification,
severity;
🔧 Stored Procedures¶
usp_log_pipeline_error¶
This stored procedure is called from the pipeline On Failure path via a Stored Procedure activity.
CREATE PROCEDURE dbo.usp_log_pipeline_error
@pipeline_name NVARCHAR(256),
@pipeline_run_id NVARCHAR(256),
@activity_name NVARCHAR(256),
@activity_type NVARCHAR(128),
@error_code NVARCHAR(128),
@error_message NVARCHAR(MAX),
@source_system NVARCHAR(256) = NULL,
@affected_table NVARCHAR(256) = NULL,
@affected_record_count INT = NULL,
@retry_count INT = 0,
@stack_trace NVARCHAR(MAX) = NULL,
@input_parameters NVARCHAR(MAX) = NULL,
@environment NVARCHAR(64) = 'PROD',
@correlation_id NVARCHAR(256) = NULL,
@parent_pipeline_run_id NVARCHAR(256) = NULL
AS
BEGIN
SET NOCOUNT ON;
DECLARE @error_classification NVARCHAR(64);
DECLARE @severity NVARCHAR(16);
DECLARE @max_retries INT = 3;
-- Classify the error based on error code and message patterns
SET @error_classification = CASE
-- Transient errors (retriable)
WHEN @error_code IN ('ConnectionTimeout', 'ServiceUnavailable', '429', '503', '504')
THEN 'TRANSIENT'
WHEN @error_message LIKE '%timeout%' OR @error_message LIKE '%throttl%'
THEN 'TRANSIENT'
WHEN @error_message LIKE '%deadlock%' OR @error_message LIKE '%lock timeout%'
THEN 'TRANSIENT'
-- Permission errors
WHEN @error_code IN ('403', 'Forbidden', 'Unauthorized', '401')
THEN 'PERMISSION'
WHEN @error_message LIKE '%access denied%' OR @error_message LIKE '%not authorized%'
THEN 'PERMISSION'
-- Data quality errors
WHEN @error_message LIKE '%schema mismatch%' OR @error_message LIKE '%type mismatch%'
THEN 'DATA_QUALITY'
WHEN @error_message LIKE '%null value%' OR @error_message LIKE '%constraint violation%'
THEN 'DATA_QUALITY'
WHEN @error_message LIKE '%duplicate%' OR @error_message LIKE '%unique%'
THEN 'DATA_QUALITY'
-- Timeout errors
WHEN @error_code LIKE '%Timeout%' OR @error_message LIKE '%exceeded the timeout%'
THEN 'TIMEOUT'
-- Resource errors
WHEN @error_message LIKE '%out of memory%' OR @error_message LIKE '%disk space%'
THEN 'RESOURCE'
WHEN @error_message LIKE '%capacity%' OR @error_message LIKE '%throttl%'
THEN 'RESOURCE'
-- Default: permanent
ELSE 'PERMANENT'
END;
-- Determine severity based on classification and context
SET @severity = CASE
-- Critical: compliance pipelines or permanent failures in production
WHEN @environment = 'PROD' AND @error_classification = 'PERMANENT'
AND (@pipeline_name LIKE '%compliance%' OR @pipeline_name LIKE '%ctr%'
OR @pipeline_name LIKE '%sar%' OR @pipeline_name LIKE '%hipaa%')
THEN 'CRITICAL'
-- Critical: resource exhaustion in production
WHEN @environment = 'PROD' AND @error_classification = 'RESOURCE'
THEN 'CRITICAL'
-- High: any permanent error in production
WHEN @environment = 'PROD' AND @error_classification IN ('PERMANENT', 'PERMISSION')
THEN 'HIGH'
-- High: data quality errors affecting gold layer
WHEN @error_classification = 'DATA_QUALITY' AND @affected_table LIKE 'gold_%'
THEN 'HIGH'
-- Medium: transient errors that have exhausted retries
WHEN @error_classification = 'TRANSIENT' AND @retry_count >= @max_retries
THEN 'MEDIUM'
-- Medium: data quality errors in silver layer
WHEN @error_classification = 'DATA_QUALITY' AND @affected_table LIKE 'silver_%'
THEN 'MEDIUM'
-- Low: transient errors still retrying, dev/staging errors
WHEN @environment IN ('DEV', 'STAGING') THEN 'LOW'
WHEN @error_classification = 'TRANSIENT' AND @retry_count < @max_retries THEN 'LOW'
ELSE 'MEDIUM'
END;
-- Determine if alerts should be sent
DECLARE @alert_sent BIT = 0;
DECLARE @alert_channel NVARCHAR(128) = NULL;
IF @severity IN ('CRITICAL', 'HIGH') AND @environment = 'PROD'
BEGIN
SET @alert_sent = 1;
SET @alert_channel = CASE
WHEN @severity = 'CRITICAL' THEN 'Teams+Email+PagerDuty'
WHEN @severity = 'HIGH' THEN 'Teams+Email'
ELSE 'Teams'
END;
END;
-- Insert the error record
INSERT INTO dbo.pipeline_errors (
pipeline_name, pipeline_run_id, activity_name, activity_type,
error_code, error_message, error_classification, severity,
source_system, affected_table, affected_record_count,
retry_count, max_retries, stack_trace, input_parameters,
environment, correlation_id, parent_pipeline_run_id,
alert_sent, alert_channel
)
VALUES (
@pipeline_name, @pipeline_run_id, @activity_name, @activity_type,
@error_code, @error_message, @error_classification, @severity,
@source_system, @affected_table, @affected_record_count,
@retry_count, @max_retries, @stack_trace, @input_parameters,
@environment, @correlation_id, @parent_pipeline_run_id,
@alert_sent, @alert_channel
);
-- Return the error_id and classification for downstream use
SELECT
SCOPE_IDENTITY() AS error_id,
@error_classification AS error_classification,
@severity AS severity,
@alert_sent AS alert_sent;
END;
usp_get_error_summary¶
Dashboard-friendly procedure for error summary reporting.
CREATE PROCEDURE dbo.usp_get_error_summary
@start_date DATETIME2 = NULL,
@end_date DATETIME2 = NULL,
@environment NVARCHAR(64) = NULL,
@severity_filter NVARCHAR(16) = NULL,
@unresolved_only BIT = 0
AS
BEGIN
SET NOCOUNT ON;
-- Default to last 7 days if no dates provided
IF @start_date IS NULL SET @start_date = DATEADD(DAY, -7, GETUTCDATE());
IF @end_date IS NULL SET @end_date = GETUTCDATE();
-- Summary by pipeline and classification
SELECT
pipeline_name,
error_classification,
severity,
COUNT(*) AS total_errors,
SUM(CASE WHEN is_resolved = 1 THEN 1 ELSE 0 END) AS resolved,
SUM(CASE WHEN is_resolved = 0 THEN 1 ELSE 0 END) AS unresolved,
MIN(error_timestamp) AS first_occurrence,
MAX(error_timestamp) AS last_occurrence,
AVG(retry_count) AS avg_retries,
AVG(CASE WHEN is_resolved = 1
THEN DATEDIFF(MINUTE, error_timestamp, resolved_timestamp)
ELSE NULL END) AS avg_resolution_minutes
FROM dbo.pipeline_errors
WHERE error_timestamp BETWEEN @start_date AND @end_date
AND (@environment IS NULL OR environment = @environment)
AND (@severity_filter IS NULL OR severity = @severity_filter)
AND (@unresolved_only = 0 OR is_resolved = 0)
GROUP BY pipeline_name, error_classification, severity
ORDER BY
CASE severity
WHEN 'CRITICAL' THEN 1
WHEN 'HIGH' THEN 2
WHEN 'MEDIUM' THEN 3
WHEN 'LOW' THEN 4
END,
COUNT(*) DESC;
-- Hourly error trend
SELECT
DATEADD(HOUR, DATEDIFF(HOUR, 0, error_timestamp), 0) AS error_hour,
error_classification,
COUNT(*) AS error_count
FROM dbo.pipeline_errors
WHERE error_timestamp BETWEEN @start_date AND @end_date
AND (@environment IS NULL OR environment = @environment)
GROUP BY
DATEADD(HOUR, DATEDIFF(HOUR, 0, error_timestamp), 0),
error_classification
ORDER BY error_hour DESC;
-- Top recurring errors
SELECT TOP 10
error_code,
LEFT(error_message, 200) AS error_summary,
pipeline_name,
COUNT(*) AS occurrence_count,
MAX(error_timestamp) AS last_seen
FROM dbo.pipeline_errors
WHERE error_timestamp BETWEEN @start_date AND @end_date
AND (@environment IS NULL OR environment = @environment)
GROUP BY error_code, LEFT(error_message, 200), pipeline_name
ORDER BY COUNT(*) DESC;
END;
usp_resolve_error¶
Mark errors as resolved with notes.
CREATE PROCEDURE dbo.usp_resolve_error
@error_id BIGINT,
@resolved_by NVARCHAR(256),
@resolution_notes NVARCHAR(MAX)
AS
BEGIN
SET NOCOUNT ON;
UPDATE dbo.pipeline_errors
SET
is_resolved = 1,
resolved_by = @resolved_by,
resolved_timestamp = GETUTCDATE(),
resolution_notes = @resolution_notes
WHERE error_id = @error_id;
SELECT @@ROWCOUNT AS rows_updated;
END;
usp_bulk_resolve_transient¶
Auto-resolve transient errors that succeeded on retry.
CREATE PROCEDURE dbo.usp_bulk_resolve_transient
@pipeline_run_id NVARCHAR(256)
AS
BEGIN
SET NOCOUNT ON;
-- If the pipeline run ultimately succeeded, resolve all transient errors from that run
UPDATE dbo.pipeline_errors
SET
is_resolved = 1,
resolved_by = 'SYSTEM_AUTO_RESOLVE',
resolved_timestamp = GETUTCDATE(),
resolution_notes = 'Auto-resolved: pipeline run completed successfully after retry'
WHERE pipeline_run_id = @pipeline_run_id
AND error_classification = 'TRANSIENT'
AND is_resolved = 0;
SELECT @@ROWCOUNT AS auto_resolved_count;
END;
🏷️ Error Classification Taxonomy¶
Classification Definitions¶
| Classification | Description | Retry? | Example |
|---|---|---|---|
| TRANSIENT | Temporary issue expected to resolve on retry | Yes (up to max_retries) | Network timeout, 429 throttling, service unavailable |
| PERMANENT | Structural issue requiring code or config change | No | Invalid column mapping, missing table, bad credentials |
| DATA_QUALITY | Data does not meet schema or business rules | No | Null required field, type mismatch, duplicate key |
| PERMISSION | Insufficient access rights | No | 403 Forbidden, workspace access denied |
| TIMEOUT | Operation exceeded time limit | Conditional | Long-running query, large copy timeout |
| RESOURCE | Capacity or resource exhaustion | Conditional | Out of memory, disk full, capacity throttled |
Classification Decision Tree¶
flowchart TD
A[Error Occurred] --> B{Error Code Known?}
B -->|Yes| C{Is it 4xx?}
B -->|No| D{Inspect Message}
C -->|401/403| E[PERMISSION]
C -->|408/429| F[TRANSIENT]
C -->|400/404/409| G{Data Related?}
G -->|Yes| H[DATA_QUALITY]
G -->|No| I[PERMANENT]
D -->|timeout pattern| J[TIMEOUT]
D -->|memory/disk/capacity| K[RESOURCE]
D -->|schema/type/null/duplicate| L[DATA_QUALITY]
D -->|throttle/retry/unavailable| M[TRANSIENT]
D -->|Other| N[PERMANENT]
F --> O{Retry Count < Max?}
J --> O
K --> O
O -->|Yes| P[Retry with Backoff]
O -->|No| Q[Escalate] Domain-Specific Error Classifications¶
Casino Gaming Compliance¶
| Error Pattern | Classification | Severity | Action |
|---|---|---|---|
| CTR filing deadline missed | PERMANENT | CRITICAL | Immediate escalation to compliance team |
| SAR pattern detection failure | DATA_QUALITY | CRITICAL | Manual review required |
| W-2G threshold calculation error | DATA_QUALITY | HIGH | Reprocess affected records |
| Player exclusion list sync timeout | TRANSIENT | HIGH | Retry, then manual verification |
Federal Agency Pipelines¶
| Error Pattern | Classification | Severity | Action |
|---|---|---|---|
| USDA NASS API rate limit | TRANSIENT | MEDIUM | Backoff and retry (respect 429 headers) |
| NOAA weather data format change | PERMANENT | HIGH | Update schema mapping |
| EPA AQI missing station data | DATA_QUALITY | MEDIUM | Flag and continue with available stations |
| DOI resource boundary file corrupt | DATA_QUALITY | HIGH | Re-download from source |
| SBA loan dataset schema drift | PERMANENT | HIGH | Update parser configuration |
Healthcare (HIPAA/42 CFR Part 2)¶
| Error Pattern | Classification | Severity | Action |
|---|---|---|---|
| PHI exposed in error log | PERMANENT | CRITICAL | Purge log, notify privacy officer |
| Consent record mismatch | DATA_QUALITY | CRITICAL | Halt processing, audit trail |
| De-identification failure | DATA_QUALITY | CRITICAL | Quarantine affected records |
| Encryption key rotation failure | PERMISSION | CRITICAL | Escalate to security team |
🔁 Pipeline Error Patterns¶
Pattern 1: Simple Pipeline with Error Logging¶
Configure every activity with an On Failure dependency to the error logging procedure.
Pipeline expression for the Stored Procedure parameters:
{
"pipeline_name": "@pipeline().Pipeline",
"pipeline_run_id": "@pipeline().RunId",
"activity_name": "@activity('Copy_USDA_Data').Error?.errorCode",
"activity_type": "CopyActivity",
"error_code": "@activity('Copy_USDA_Data').Error?.errorCode",
"error_message": "@activity('Copy_USDA_Data').Error?.message",
"source_system": "USDA_NASS_API",
"affected_table": "bronze_usda_crop_production",
"environment": "@pipeline().parameters.environment",
"correlation_id": "@pipeline().GroupId"
}
Pattern 2: ForEach with Partial Failure Handling¶
When processing multiple sources in a ForEach loop, capture errors per iteration without stopping the entire loop.
{
"type": "ForEach",
"typeProperties": {
"isSequential": false,
"batchCount": 10,
"items": "@pipeline().parameters.source_list",
"activities": [
{
"name": "Copy_Source",
"type": "Copy",
"dependsOn": [],
"onFailure": [
{
"name": "Log_Error",
"type": "SqlServerStoredProcedure"
}
]
},
{
"name": "Log_Error",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "Copy_Source",
"dependencyConditions": ["Failed"]
}
],
"typeProperties": {
"storedProcedureName": "dbo.usp_log_pipeline_error"
}
}
]
}
}
Pattern 3: Nested Pipeline Error Propagation¶
Parent pipeline:
Execute Pipeline: child_ingest_source
|-- On Success --> Continue
|-- On Failure --> Log Error (with parent context)
--> If Expression: Should Abort?
|-- True --> Fail Activity
|-- False --> Continue (partial failure)
Child pipeline must end with:
Activity Failed
|-- On Failure --> Log Error (with child detail)
--> Fail Activity (propagate to parent)
Pattern 4: Conditional Retry Based on Classification¶
Use a Web Activity to call the error logging procedure, capture the classification, then branch.
flowchart LR
A[Activity Failed] --> B[Log Error & Get Classification]
B --> C{Classification?}
C -->|TRANSIENT| D{Retry Count < Max?}
C -->|PERMANENT| E[Send Alert & Fail]
C -->|DATA_QUALITY| F[Quarantine & Continue]
D -->|Yes| G[Wait with Backoff]
D -->|No| E
G --> H[Retry Activity] 🐍 PySpark Error Handling¶
Structured Error Handling in Notebooks¶
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException, ParseException
from datetime import datetime, timezone
import json
import traceback
class PipelineErrorHandler:
"""Centralized error handler for Fabric notebooks."""
def __init__(self, spark: SparkSession, pipeline_name: str, environment: str = "PROD"):
self.spark = spark
self.pipeline_name = pipeline_name
self.environment = environment
self.error_table = "lh_operations.pipeline_errors"
self.errors_collected = []
def log_error(
self,
activity_name: str,
error: Exception,
source_system: str = None,
affected_table: str = None,
affected_record_count: int = None,
input_parameters: dict = None,
correlation_id: str = None
):
"""Log an error to the centralized error table."""
error_record = {
"error_timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline_name": self.pipeline_name,
"pipeline_run_id": self.spark.conf.get("spark.fabric.runId", "unknown"),
"activity_name": activity_name,
"activity_type": "SparkNotebook",
"error_code": type(error).__name__,
"error_message": str(error)[:4000],
"error_classification": self._classify_error(error),
"severity": self._determine_severity(error, affected_table),
"source_system": source_system,
"affected_table": affected_table,
"affected_record_count": affected_record_count,
"retry_count": 0,
"max_retries": 3,
"is_resolved": False,
"stack_trace": traceback.format_exc()[:4000],
"input_parameters": json.dumps(input_parameters) if input_parameters else None,
"environment": self.environment,
"correlation_id": correlation_id
}
self.errors_collected.append(error_record)
self._write_error(error_record)
def _classify_error(self, error: Exception) -> str:
"""Classify the error based on its type and message."""
error_msg = str(error).lower()
error_type = type(error).__name__
if error_type in ("AnalysisException",) and "table or view not found" in error_msg:
return "PERMANENT"
if "timeout" in error_msg or "timed out" in error_msg:
return "TIMEOUT"
if "permission" in error_msg or "access denied" in error_msg or "unauthorized" in error_msg:
return "PERMISSION"
if any(kw in error_msg for kw in ("schema", "type mismatch", "null", "duplicate", "constraint")):
return "DATA_QUALITY"
if any(kw in error_msg for kw in ("out of memory", "disk", "capacity", "java.lang.OutOfMemoryError")):
return "RESOURCE"
if any(kw in error_msg for kw in ("throttl", "retry", "unavailable", "connection reset")):
return "TRANSIENT"
return "PERMANENT"
def _determine_severity(self, error: Exception, affected_table: str = None) -> str:
"""Determine severity based on error type and affected table."""
classification = self._classify_error(error)
if classification == "RESOURCE":
return "CRITICAL"
if affected_table and any(kw in (affected_table or "").lower() for kw in ("compliance", "ctr", "sar", "hipaa")):
return "CRITICAL"
if affected_table and "gold_" in (affected_table or ""):
return "HIGH"
if classification in ("PERMANENT", "PERMISSION"):
return "HIGH"
if classification == "DATA_QUALITY":
return "MEDIUM"
return "LOW"
def _write_error(self, error_record: dict):
"""Write error record to Delta table."""
try:
df = self.spark.createDataFrame([error_record])
df.write.mode("append").format("delta").saveAsTable(self.error_table)
except Exception as write_err:
# Fallback: print to notebook output so it is captured in logs
print(f"ERROR_LOG_FAILURE: Could not write to {self.error_table}: {write_err}")
print(f"ORIGINAL_ERROR: {json.dumps(error_record, default=str)}")
def get_summary(self) -> list:
"""Return all errors collected in this session."""
return self.errors_collected
Using the Error Handler in Notebooks¶
# Cell 1: Initialize
error_handler = PipelineErrorHandler(
spark=spark,
pipeline_name="nb_bronze_usda_crop_production",
environment="PROD"
)
# Cell 2: Protected data processing
try:
# Read source data
raw_df = spark.read.format("json").load("abfss://bronze@onelake.dfs.fabric.microsoft.com/usda/crop_production/")
# Validate schema before processing
expected_columns = {"state_code", "commodity", "year", "production_value", "unit"}
actual_columns = set(raw_df.columns)
missing = expected_columns - actual_columns
if missing:
raise ValueError(f"Schema validation failed. Missing columns: {missing}")
# Process data
processed_df = raw_df.filter(raw_df.production_value.isNotNull())
processed_df.write.mode("overwrite").format("delta").saveAsTable("lh_silver.usda_crop_production")
record_count = processed_df.count()
print(f"Successfully processed {record_count} records")
except AnalysisException as e:
error_handler.log_error(
activity_name="read_usda_source",
error=e,
source_system="USDA_NASS",
affected_table="silver_usda_crop_production"
)
raise # Re-raise to fail the notebook activity
except ValueError as e:
error_handler.log_error(
activity_name="validate_schema",
error=e,
source_system="USDA_NASS",
affected_table="silver_usda_crop_production"
)
raise
except Exception as e:
error_handler.log_error(
activity_name="process_usda_data",
error=e,
source_system="USDA_NASS",
affected_table="silver_usda_crop_production"
)
raise
Retry Decorator for Transient Failures¶
import time
import functools
from typing import Callable, Tuple, Type
def retry_on_transient(
max_retries: int = 3,
base_delay: float = 5.0,
backoff_multiplier: float = 2.0,
max_delay: float = 120.0,
transient_exceptions: Tuple[Type[Exception], ...] = (ConnectionError, TimeoutError)
):
"""Decorator that retries a function on transient exceptions with exponential backoff."""
def decorator(func: Callable):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except transient_exceptions as e:
last_exception = e
if attempt < max_retries:
delay = min(base_delay * (backoff_multiplier ** attempt), max_delay)
print(f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
f"Retrying in {delay:.1f}s...")
time.sleep(delay)
else:
print(f"All {max_retries + 1} attempts failed.")
raise
raise last_exception
return wrapper
return decorator
# Usage:
@retry_on_transient(max_retries=3, base_delay=10.0)
def fetch_api_data(endpoint: str) -> dict:
"""Fetch data from an external API with automatic retry."""
import requests
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
return response.json()
Data Quality Error Collection (Non-Fatal)¶
For data quality issues that should be logged but should not halt the pipeline:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
class DataQualityCollector:
"""Collect and log data quality issues without failing the pipeline."""
def __init__(self, spark: SparkSession, source_table: str, target_table: str):
self.spark = spark
self.source_table = source_table
self.target_table = target_table
self.issues = []
def check_nulls(self, df: DataFrame, required_columns: list) -> DataFrame:
"""Flag rows with nulls in required columns, route bad rows to quarantine."""
null_condition = F.lit(False)
for col_name in required_columns:
null_condition = null_condition | F.col(col_name).isNull()
bad_rows = df.filter(null_condition)
good_rows = df.filter(~null_condition)
bad_count = bad_rows.count()
if bad_count > 0:
self.issues.append({
"check": "null_check",
"columns": required_columns,
"bad_record_count": bad_count,
"action": "quarantined"
})
# Write bad rows to quarantine
bad_rows.withColumn("_quarantine_reason", F.lit("null_in_required_column")) \
.withColumn("_quarantine_timestamp", F.current_timestamp()) \
.write.mode("append").format("delta") \
.saveAsTable(f"lh_bronze.quarantine_{self.source_table}")
return good_rows
def check_duplicates(self, df: DataFrame, key_columns: list) -> DataFrame:
"""Remove duplicates and log the count."""
before_count = df.count()
deduped = df.dropDuplicates(key_columns)
after_count = deduped.count()
dup_count = before_count - after_count
if dup_count > 0:
self.issues.append({
"check": "duplicate_check",
"columns": key_columns,
"bad_record_count": dup_count,
"action": "removed"
})
return deduped
def check_range(self, df: DataFrame, column: str, min_val, max_val) -> DataFrame:
"""Flag values outside expected range."""
out_of_range = df.filter(
(F.col(column) < min_val) | (F.col(column) > max_val)
)
bad_count = out_of_range.count()
if bad_count > 0:
self.issues.append({
"check": "range_check",
"column": column,
"expected_range": f"[{min_val}, {max_val}]",
"bad_record_count": bad_count,
"action": "quarantined"
})
out_of_range.withColumn("_quarantine_reason",
F.lit(f"out_of_range_{column}_{min_val}_{max_val}")) \
.withColumn("_quarantine_timestamp", F.current_timestamp()) \
.write.mode("append").format("delta") \
.saveAsTable(f"lh_bronze.quarantine_{self.source_table}")
return df.filter(
(F.col(column) >= min_val) & (F.col(column) <= max_val)
)
def get_summary(self) -> dict:
"""Return a summary of all quality issues found."""
return {
"source_table": self.source_table,
"target_table": self.target_table,
"total_issues": len(self.issues),
"total_bad_records": sum(i["bad_record_count"] for i in self.issues),
"checks": self.issues
}
🔔 Monitoring with Data Activator¶
Setting Up Monitoring Reflexes¶
Data Activator (Reflex) items in Fabric provide real-time monitoring without custom code. Create Reflex items for the following scenarios:
Pipeline Health Reflex¶
Trigger: pipeline_errors table
Condition: COUNT(*) where is_resolved = 0 AND severity IN ('CRITICAL', 'HIGH') > 0
Frequency: Every 5 minutes
Action: Teams notification to #fabric-ops channel
Data Freshness Reflex¶
Trigger: Query on ingestion metadata table
Condition: MAX(last_load_timestamp) < DATEADD(HOUR, -4, GETUTCDATE())
Frequency: Every 30 minutes
Action: Email to data engineering team
Error Spike Detection¶
Trigger: pipeline_errors table
Condition: COUNT(*) in last 1 hour > 3x rolling 7-day hourly average
Frequency: Every 15 minutes
Action: Teams alert + PagerDuty integration
Integration with Power Automate¶
For complex alerting workflows, use Power Automate flows triggered by Data Activator:
flowchart LR
A[Data Activator Trigger] --> B[Power Automate Flow]
B --> C{Severity?}
C -->|CRITICAL| D[Teams + Email + PagerDuty]
C -->|HIGH| E[Teams + Email]
C -->|MEDIUM| F[Teams Only]
C -->|LOW| G[Log Only]
D --> H[Create Incident Ticket]
E --> H
H --> I[Update Error Record: alert_sent = 1] 📢 Alert Escalation Matrix¶
Escalation Tiers¶
| Severity | Response Time | First Responder | Escalation | Communication |
|---|---|---|---|---|
| CRITICAL | 15 minutes | On-call engineer | Engineering manager at 30 min | Teams + Email + PagerDuty |
| HIGH | 1 hour | Assigned data engineer | Team lead at 2 hours | Teams + Email |
| MEDIUM | 4 hours | Any team member | Sprint backlog if unresolved | Teams channel |
| LOW | Next business day | Self-service | None | Dashboard only |
Escalation by Domain¶
| Domain | Critical Threshold | Escalation Contact |
|---|---|---|
| Casino Compliance (CTR/SAR) | Any failure in filing window | Compliance Officer + CTO |
| HIPAA / Tribal Healthcare | Any PHI exposure risk | Privacy Officer + Legal |
| Federal Data (USDA/NOAA/EPA) | SLA breach > 2 hours | Agency Liaison + PM |
| Real-Time Streaming | Lag > 15 minutes | Platform Team + Ops |
| Gold Layer / BI Reports | Stale data > 8 hours | Analytics Lead |
On-Call Rotation Template¶
Week 1: Engineer A (primary), Engineer B (secondary)
Week 2: Engineer B (primary), Engineer C (secondary)
Week 3: Engineer C (primary), Engineer A (secondary)
Rotation rules:
- Primary responds to CRITICAL and HIGH within SLA
- Secondary covers if primary is unavailable within 15 min
- All team members handle MEDIUM during business hours
- LOW items go to sprint backlog
🔍 KQL Queries for Error Analysis¶
Error Trend Analysis (Eventhouse)¶
When pipeline errors are also ingested into an Eventhouse for real-time analysis:
// Error count by hour over the last 7 days
pipeline_errors
| where error_timestamp > ago(7d)
| summarize error_count = count() by bin(error_timestamp, 1h), error_classification
| render timechart
Top Failing Pipelines¶
// Top 10 pipelines by error count in the last 24 hours
pipeline_errors
| where error_timestamp > ago(24h)
| summarize
total_errors = count(),
critical_errors = countif(severity == "CRITICAL"),
high_errors = countif(severity == "HIGH"),
unresolved = countif(is_resolved == false)
by pipeline_name
| order by total_errors desc
| take 10
Error Duration Analysis¶
// Average time to resolution by classification
pipeline_errors
| where is_resolved == true
| extend resolution_minutes = datetime_diff('minute', resolved_timestamp, error_timestamp)
| summarize
avg_resolution_min = avg(resolution_minutes),
p50_resolution_min = percentile(resolution_minutes, 50),
p95_resolution_min = percentile(resolution_minutes, 95),
total_resolved = count()
by error_classification
| order by avg_resolution_min desc
Transient Error Retry Effectiveness¶
// How often do retries succeed?
pipeline_errors
| where error_classification == "TRANSIENT"
| summarize
total = count(),
resolved_by_retry = countif(is_resolved == true and resolved_by == "SYSTEM_AUTO_RESOLVE"),
escalated = countif(retry_count >= max_retries and is_resolved == false)
by pipeline_name
| extend retry_success_rate = round(100.0 * resolved_by_retry / total, 1)
| order by total desc
Correlation Analysis for Nested Pipelines¶
// Trace errors across parent-child pipeline runs
pipeline_errors
| where correlation_id == "<specific_correlation_id>"
| order by error_timestamp asc
| project error_timestamp, pipeline_name, activity_name, error_classification, severity, error_message
Data Quality Error Patterns¶
// Data quality issues by source system and affected table
pipeline_errors
| where error_classification == "DATA_QUALITY"
| where error_timestamp > ago(30d)
| summarize
issue_count = count(),
affected_records = sum(affected_record_count),
last_occurrence = max(error_timestamp)
by source_system, affected_table
| order by issue_count desc
Real-Time Error Monitoring Dashboard Query¶
// Live error feed for operations dashboard (last 4 hours)
pipeline_errors
| where error_timestamp > ago(4h)
| where is_resolved == false
| project
error_timestamp,
pipeline_name,
activity_name,
severity,
error_classification,
error_message = substring(error_message, 0, 150),
retry_count,
source_system
| order by
case(severity, "CRITICAL", 1, "HIGH", 2, "MEDIUM", 3, "LOW", 4, 5),
error_timestamp desc
📊 Operational Dashboards¶
Power BI Error Dashboard Design¶
Build a Power BI report connected to the pipeline_errors table via Direct Lake for near-real-time visibility.
Recommended pages:
| Page | Content | Refresh |
|---|---|---|
| Overview | Error count cards (CRITICAL/HIGH/MEDIUM/LOW), trend line, resolution rate | Direct Lake (near real-time) |
| Pipeline Detail | Drilldown by pipeline name, activity, and time range | Direct Lake |
| Classification Analysis | Error distribution by classification, retry effectiveness | Direct Lake |
| SLA Tracking | Time to resolution vs. SLA targets, compliance pipeline health | Direct Lake |
| Domain Health | Per-domain (Casino, Federal, Healthcare) error status | Direct Lake |
Key Measures (DAX)¶
// Unresolved Critical Errors
Unresolved Critical =
CALCULATE(
COUNTROWS(pipeline_errors),
pipeline_errors[is_resolved] = FALSE(),
pipeline_errors[severity] = "CRITICAL"
)
// Mean Time to Resolution (hours)
MTTR Hours =
CALCULATE(
AVERAGEX(
FILTER(pipeline_errors, pipeline_errors[is_resolved] = TRUE()),
DATEDIFF(pipeline_errors[error_timestamp], pipeline_errors[resolved_timestamp], HOUR)
)
)
// Error Rate (errors per 100 pipeline runs)
Error Rate =
DIVIDE(
COUNTROWS(pipeline_errors),
[Total Pipeline Runs],
0
) * 100
// Resolution Rate
Resolution Rate =
DIVIDE(
CALCULATE(COUNTROWS(pipeline_errors), pipeline_errors[is_resolved] = TRUE()),
COUNTROWS(pipeline_errors),
0
)
📓 Runbook Integration¶
Runbook Template¶
For each common error scenario, maintain a runbook entry:
## Runbook: [Error Scenario Name]
**Error Pattern:** [What the error looks like]
**Classification:** [TRANSIENT | PERMANENT | DATA_QUALITY | PERMISSION | TIMEOUT | RESOURCE]
**Severity:** [CRITICAL | HIGH | MEDIUM | LOW]
**Affected Pipelines:** [List of pipelines]
### Symptoms
- [Observable symptom 1]
- [Observable symptom 2]
### Root Cause
[Known root cause or investigation steps]
### Resolution Steps
1. [Step 1]
2. [Step 2]
3. [Step 3]
### Verification
- [ ] [How to verify the fix]
- [ ] [How to confirm data integrity]
### Prevention
- [How to prevent recurrence]
### History
| Date | Resolved By | Notes |
|------|-------------|-------|
| YYYY-MM-DD | [Name] | [Brief notes] |
Common Runbook Entries¶
| Scenario | First Action | Typical Resolution |
|---|---|---|
| USDA API 429 throttling | Check API quota usage | Reduce batch size, increase delay between calls |
| Eventhouse ingestion lag | Check capacity utilization | Scale up capacity or reduce concurrent loads |
| Delta table version conflict | Check concurrent writers | Enable optimistic concurrency, stagger schedules |
| Fabric capacity throttled | Check admin portal metrics | Pause non-critical workloads, request capacity increase |
| Gateway connection pool exhaustion | Check gateway monitor | Increase max connections, add gateway node |
| Schema drift in source system | Compare expected vs actual schema | Update schema mapping, notify source team |
⭐ Summary¶
Effective error handling in Microsoft Fabric requires:
- Consistent capture through the Single Error Activity pattern
- Structured storage in a centralized error table with classification and severity
- Automated classification that drives retry decisions and alert routing
- Domain-aware severity that prioritizes compliance and healthcare errors
- Multi-channel alerting through Data Activator, Teams, and PagerDuty
- Operational visibility via Power BI dashboards and KQL analysis
- Runbook discipline that accelerates resolution and prevents recurrence
📚 Related Documents¶
| Document | Description |
|---|---|
| Pipelines & Data Movement | Pipeline configuration patterns |
| Alerting & Data Activator | Detailed alerting setup |
| Disaster Recovery | Recovery procedures |
| Security Guide | Compliance and access controls |