Reliability Patterns¶
Home | Best Practices | Operational Excellence | Reliability
Reliability patterns and practices for Azure analytics platforms.
Overview¶
Building reliable data platforms requires designing for failure, implementing redundancy, and establishing recovery procedures.
Reliability Principles¶
Design Principles¶
| Principle | Implementation |
|---|---|
| Assume failure | Build retry logic, circuit breakers |
| Design for scale | Use auto-scaling, partitioning |
| Eliminate single points | Redundancy at every layer |
| Test failure modes | Chaos engineering, DR drills |
| Automate recovery | Self-healing, automated failover |
Retry Patterns¶
Exponential Backoff¶
import time
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries > max_retries:
raise
delay = min(base_delay * (exponential_base ** retries), max_delay)
if jitter:
delay = delay * (0.5 + random.random())
print(f"Retry {retries}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=5)
def call_external_api(url: str) -> dict:
"""Call external API with retry."""
response = requests.get(url)
response.raise_for_status()
return response.json()
Circuit Breaker¶
from datetime import datetime, timedelta
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker pattern implementation."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection."""
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError("Circuit is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call."""
with self.lock:
self.failures = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call."""
with self.lock:
self.failures += 1
self.last_failure_time = datetime.utcnow()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
if self.last_failure_time is None:
return True
return datetime.utcnow() > self.last_failure_time + timedelta(seconds=self.recovery_timeout)
Redundancy Patterns¶
Active-Passive¶
// Primary Synapse workspace
resource primaryWorkspace 'Microsoft.Synapse/workspaces@2021-06-01' = {
name: 'syn-primary-eastus'
location: 'eastus'
properties: {
defaultDataLakeStorage: {
resourceId: primaryStorageAccount.id
accountUrl: 'https://${primaryStorageAccount.name}.dfs.core.windows.net'
filesystem: 'synapse'
}
}
}
// Secondary Synapse workspace (standby)
resource secondaryWorkspace 'Microsoft.Synapse/workspaces@2021-06-01' = {
name: 'syn-secondary-westus'
location: 'westus'
properties: {
defaultDataLakeStorage: {
resourceId: secondaryStorageAccount.id
accountUrl: 'https://${secondaryStorageAccount.name}.dfs.core.windows.net'
filesystem: 'synapse'
}
}
}
Data Replication¶
from azure.storage.blob import BlobServiceClient
import asyncio
class DataReplicator:
"""Replicate data between storage accounts."""
def __init__(self, source_conn: str, target_conn: str):
self.source_client = BlobServiceClient.from_connection_string(source_conn)
self.target_client = BlobServiceClient.from_connection_string(target_conn)
async def replicate_container(self, container_name: str, prefix: str = ""):
"""Replicate all blobs in a container."""
source_container = self.source_client.get_container_client(container_name)
target_container = self.target_client.get_container_client(container_name)
# Ensure target container exists
try:
await target_container.create_container()
except:
pass
# Copy blobs
async for blob in source_container.list_blobs(name_starts_with=prefix):
source_blob = source_container.get_blob_client(blob.name)
target_blob = target_container.get_blob_client(blob.name)
# Start async copy
await target_blob.start_copy_from_url(source_blob.url)
Health Checks¶
Endpoint Health¶
from fastapi import FastAPI, Response
from datetime import datetime
app = FastAPI()
class HealthChecker:
"""Check health of dependencies."""
async def check_storage(self) -> dict:
"""Check storage account connectivity."""
try:
# Attempt to list containers
containers = list(blob_client.list_containers(max_results=1))
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_database(self) -> dict:
"""Check database connectivity."""
try:
spark.sql("SELECT 1").collect()
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
health_checker = HealthChecker()
@app.get("/health")
async def health_check():
"""Comprehensive health check endpoint."""
checks = {
"storage": await health_checker.check_storage(),
"database": await health_checker.check_database(),
"timestamp": datetime.utcnow().isoformat()
}
overall_healthy = all(c["status"] == "healthy" for c in checks.values() if isinstance(c, dict))
status_code = 200 if overall_healthy else 503
return Response(content=json.dumps(checks), status_code=status_code)
Pipeline Health¶
def check_pipeline_health(pipeline_name: str, lookback_hours: int = 24) -> dict:
"""Check pipeline execution health."""
runs = get_recent_pipeline_runs(pipeline_name, lookback_hours)
if not runs:
return {"status": "unknown", "message": "No recent runs"}
success_count = sum(1 for r in runs if r.status == "Succeeded")
failure_count = sum(1 for r in runs if r.status == "Failed")
success_rate = success_count / len(runs) if runs else 0
if success_rate >= 0.95:
status = "healthy"
elif success_rate >= 0.80:
status = "degraded"
else:
status = "unhealthy"
return {
"status": status,
"success_rate": round(success_rate * 100, 2),
"total_runs": len(runs),
"failures": failure_count
}
Self-Healing¶
Auto-Recovery Pipeline¶
class SelfHealingPipeline:
"""Pipeline with automatic recovery capabilities."""
def __init__(self, pipeline_name: str):
self.pipeline_name = pipeline_name
self.max_auto_retries = 3
self.retry_count = 0
async def run_with_recovery(self, parameters: dict):
"""Run pipeline with automatic recovery on failure."""
while self.retry_count < self.max_auto_retries:
try:
result = await self._execute_pipeline(parameters)
if result.status == "Succeeded":
self.retry_count = 0
return result
# Analyze failure and attempt recovery
recovery_action = self._analyze_failure(result)
await self._apply_recovery(recovery_action)
except Exception as e:
self.retry_count += 1
if self.retry_count >= self.max_auto_retries:
await self._escalate_failure(e)
raise
await asyncio.sleep(60 * self.retry_count) # Increasing delay
def _analyze_failure(self, result) -> str:
"""Analyze failure and determine recovery action."""
error_message = result.error_message or ""
if "OutOfMemory" in error_message:
return "scale_up"
elif "Timeout" in error_message:
return "increase_timeout"
elif "ConnectionRefused" in error_message:
return "retry_later"
else:
return "manual_review"
async def _apply_recovery(self, action: str):
"""Apply recovery action."""
if action == "scale_up":
await self._scale_compute_resources()
elif action == "increase_timeout":
self.parameters["timeout"] = self.parameters.get("timeout", 3600) * 2
elif action == "retry_later":
await asyncio.sleep(300)
SLA Management¶
SLA Definitions¶
| Tier | Availability | RTO | RPO |
|---|---|---|---|
| Platinum | 99.99% | 15 min | 0 |
| Gold | 99.9% | 1 hour | 15 min |
| Silver | 99.5% | 4 hours | 1 hour |
| Bronze | 99% | 24 hours | 4 hours |
SLA Tracking¶
// Calculate SLA compliance
let sla_target = 99.9;
HealthCheckLogs
| where TimeGenerated > ago(30d)
| summarize
HealthyMinutes = countif(Status == "healthy"),
TotalMinutes = count()
by bin(TimeGenerated, 1d)
| extend
UptimePercent = round(100.0 * HealthyMinutes / TotalMinutes, 4),
SLAMet = iff(100.0 * HealthyMinutes / TotalMinutes >= sla_target, true, false)
| summarize
AvgUptime = avg(UptimePercent),
DaysMeetingSLA = countif(SLAMet),
TotalDays = count()
Related Documentation¶
Last Updated: January 2025