Lambda Architecture Pattern¶
Comprehensive guide to implementing Lambda Architecture on Azure for combining batch and real-time stream processing.
Table of Contents¶
- Overview
- Architecture Components
- Azure Implementation
- Design Patterns
- Performance Optimization
- Monitoring & Operations
- Best Practices
- Common Challenges
- Use Cases
Overview¶
What is Lambda Architecture?¶
Lambda Architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch and stream-processing methods. The architecture balances latency, throughput, and fault-tolerance by using batch processing for comprehensive and accurate views, while simultaneously using real-time stream processing for low-latency approximate results.
Key Principles¶
graph TB
subgraph "Lambda Architecture Principles"
P1[Human Fault-Tolerance]
P2[Data Immutability]
P3[Recomputation]
P4[Multi-View Serving]
P1 --> Impl[Implementation]
P2 --> Impl
P3 --> Impl
P4 --> Impl
end - Human Fault-Tolerance: System should be resilient to human error through recomputation
- Data Immutability: Raw data is never modified, only appended
- Recomputation: Ability to recompute views from raw data
- Multi-View Serving: Combine batch and real-time views for complete picture
Architecture Diagram¶
graph TB
subgraph "Data Sources"
DS1[IoT Devices]
DS2[Applications]
DS3[External APIs]
DS4[Databases CDC]
end
subgraph "Ingestion Layer"
DS1 --> EH[Event Hubs]
DS2 --> EH
DS3 --> EH
DS4 --> EH
end
subgraph "Batch Layer - Historical Processing"
EH --> Capture[Event Hubs Capture]
Capture --> DL[Data Lake Gen2<br/>Immutable Storage]
DL --> Spark[Synapse Spark Pools<br/>Batch Processing]
Spark --> BV[Batch Views<br/>Delta Tables]
end
subgraph "Speed Layer - Real-time Processing"
EH --> SA[Stream Analytics<br/>Real-time Processing]
SA --> SV[Speed Views<br/>Cosmos DB / Cache]
end
subgraph "Serving Layer"
BV --> Merge[View Merger]
SV --> Merge
Merge --> SQL[Synapse SQL Pools<br/>Unified Queries]
end
subgraph "Consumption"
SQL --> PBI[Power BI<br/>Dashboards]
SQL --> Apps[Applications<br/>APIs]
end
style DL fill:#e1f5fe
style BV fill:#e8f5e9
style SV fill:#fff3e0
style Merge fill:#f3e5f5 Architecture Components¶
1. Batch Layer¶
The batch layer stores the immutable, constantly growing master dataset and precomputes batch views.
Characteristics¶
| Aspect | Description |
|---|---|
| Latency | Hours to days |
| Accuracy | 100% accurate, complete processing |
| Complexity | Can handle complex computations |
| Fault Tolerance | High - can recompute from source |
| Data Volume | Handles unlimited data volumes |
Azure Services Mapping¶
graph LR
subgraph "Batch Layer Components"
Storage[Data Lake Storage Gen2<br/>Master Dataset]
Compute[Synapse Spark Pools<br/>Batch Processing]
Output[Delta Lake Tables<br/>Batch Views]
Storage --> Compute
Compute --> Output
end
style Storage fill:#e3f2fd
style Compute fill:#f3e5f5
style Output fill:#e8f5e9 | Component | Azure Service | Purpose |
|---|---|---|
| Master Dataset | Data Lake Storage Gen2 | Immutable storage of all raw data |
| Batch Compute | Synapse Spark Pools | Complex batch processing |
| Batch Views | Delta Lake Tables | Precomputed aggregations |
| Orchestration | Azure Data Factory | Batch job scheduling |
| Catalog | Synapse Metastore | Metadata management |
Implementation Example¶
# Batch Layer Processing with PySpark
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from datetime import datetime, timedelta
class BatchLayer:
"""Batch layer for Lambda architecture implementing comprehensive data processing"""
def __init__(self, spark: SparkSession):
self.spark = spark
self.master_dataset_path = "abfss://master@datalake.dfs.core.windows.net/raw"
self.batch_views_path = "abfss://serving@datalake.dfs.core.windows.net/batch"
def process_batch_view(self, view_name: str, start_date: str, end_date: str):
"""
Process batch view with complete accuracy over historical data
Args:
view_name: Name of the batch view to compute
start_date: Start date for processing (YYYY-MM-DD)
end_date: End date for processing (YYYY-MM-DD)
"""
print(f"Processing batch view: {view_name}")
print(f"Date range: {start_date} to {end_date}")
# Read master dataset
master_df = self.spark.read.format("delta") \
.load(self.master_dataset_path) \
.filter(f"event_date >= '{start_date}' AND event_date <= '{end_date}'")
# Example: Device telemetry aggregations
if view_name == "device_daily_metrics":
batch_view = self._compute_device_metrics(master_df)
# Example: User activity summaries
elif view_name == "user_activity_summary":
batch_view = self._compute_user_activity(master_df)
# Write batch view
output_path = f"{self.batch_views_path}/{view_name}"
batch_view.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("batch_date") \
.option("overwriteSchema", "true") \
.save(output_path)
# Optimize for queries
DeltaTable.forPath(self.spark, output_path).optimize().executeCompaction()
print(f"Batch view {view_name} processed successfully")
def _compute_device_metrics(self, df):
"""Compute comprehensive device metrics"""
return df.groupBy("device_id", "event_date") \
.agg(
count("*").alias("event_count"),
avg("temperature").alias("avg_temperature"),
max("temperature").alias("max_temperature"),
min("temperature").alias("min_temperature"),
stddev("temperature").alias("stddev_temperature"),
percentile_approx("temperature", 0.5).alias("median_temperature"),
sum("energy_consumed").alias("total_energy"),
countDistinct("user_id").alias("unique_users")
) \
.withColumn("batch_date", current_date()) \
.withColumn("batch_timestamp", current_timestamp())
def _compute_user_activity(self, df):
"""Compute user activity patterns"""
from pyspark.sql.functions import (
col, count, sum, avg, min, max,
hour, dayofweek, countDistinct,
when, lit, current_date, current_timestamp
)
return df.groupBy("user_id", "event_date") \
.agg(
count("*").alias("total_events"),
countDistinct("device_id").alias("unique_devices"),
sum(when(col("event_type") == "login", 1).otherwise(0)).alias("login_count"),
sum(when(col("event_type") == "transaction", 1).otherwise(0)).alias("transaction_count"),
avg("session_duration").alias("avg_session_duration"),
max("session_duration").alias("max_session_duration"),
sum("revenue").alias("total_revenue")
) \
.withColumn("batch_date", current_date()) \
.withColumn("batch_timestamp", current_timestamp())
def rebuild_view_from_scratch(self, view_name: str):
"""
Rebuild entire view from master dataset
Demonstrates recomputation capability
"""
print(f"Rebuilding {view_name} from scratch...")
# Get date range from master dataset
date_range = self.spark.read.format("delta") \
.load(self.master_dataset_path) \
.agg(
min("event_date").alias("min_date"),
max("event_date").alias("max_date")
) \
.first()
# Process entire history
self.process_batch_view(
view_name,
str(date_range.min_date),
str(date_range.max_date)
)
2. Speed Layer¶
The speed layer processes data streams in real-time to provide low-latency views, compensating for the batch layer's high latency.
Characteristics¶
| Aspect | Description |
|---|---|
| Latency | Milliseconds to seconds |
| Accuracy | Approximate, eventually consistent |
| Complexity | Simpler processing (incremental) |
| Fault Tolerance | Medium - state can be reconstructed |
| Data Volume | Recent data only (windowed) |
Azure Services Mapping¶
graph LR
subgraph "Speed Layer Components"
Input[Event Hubs<br/>Real-time Ingestion]
Compute[Stream Analytics<br/>Real-time Processing]
Output[Cosmos DB / Cache<br/>Speed Views]
Input --> Compute
Compute --> Output
end
style Input fill:#fff3e0
style Compute fill:#f3e5f5
style Output fill:#e8f5e9 | Component | Azure Service | Purpose |
|---|---|---|
| Stream Ingestion | Event Hubs | Real-time event ingestion |
| Stream Processing | Stream Analytics / Databricks | Real-time computations |
| Speed Views | Cosmos DB | Low-latency data access |
| Caching | Azure Cache for Redis | Sub-millisecond reads |
| State Store | Cosmos DB / Table Storage | Stateful stream processing |
Implementation Example¶
-- Stream Analytics Query for Speed Layer
-- Real-time device metrics with 1-minute tumbling windows
WITH DeviceMetrics AS (
SELECT
deviceId,
System.Timestamp() AS WindowEnd,
COUNT(*) AS EventCount,
AVG(temperature) AS AvgTemperature,
MAX(temperature) AS MaxTemperature,
MIN(temperature) AS MinTemperature,
SUM(energyConsumed) AS TotalEnergy,
COUNT(DISTINCT userId) AS UniqueUsers
FROM
EventHubInput TIMESTAMP BY eventTimestamp
GROUP BY
deviceId,
TumblingWindow(minute, 1)
),
Anomalies AS (
-- Detect anomalies in real-time
SELECT
deviceId,
WindowEnd,
AvgTemperature,
CASE
WHEN AvgTemperature > 85 THEN 'HIGH_TEMP'
WHEN AvgTemperature < 32 THEN 'LOW_TEMP'
ELSE 'NORMAL'
END AS AlertLevel
FROM
DeviceMetrics
)
-- Output to Cosmos DB for real-time queries
SELECT
dm.deviceId,
dm.WindowEnd AS timestamp,
dm.EventCount AS eventCount,
dm.AvgTemperature AS avgTemp,
dm.MaxTemperature AS maxTemp,
dm.MinTemperature AS minTemp,
dm.TotalEnergy AS totalEnergy,
dm.UniqueUsers AS uniqueUsers,
a.AlertLevel AS alertLevel,
'SPEED_LAYER' AS source,
System.Timestamp() AS processedAt
INTO
CosmosDBOutput
FROM
DeviceMetrics dm
LEFT JOIN Anomalies a ON dm.deviceId = a.deviceId AND dm.WindowEnd = a.WindowEnd
Databricks Structured Streaming Example¶
# Speed Layer with Databricks Structured Streaming
from pyspark.sql.functions import *
from pyspark.sql.types import *
class SpeedLayer:
"""Speed layer for real-time stream processing"""
def __init__(self, spark):
self.spark = spark
self.checkpoint_path = "abfss://checkpoints@datalake.dfs.core.windows.net/speed"
def process_real_time_metrics(self):
"""Process real-time metrics from Event Hubs"""
# Read from Event Hubs
df = self.spark.readStream \
.format("eventhubs") \
.option("eventhubs.connectionString", self._get_connection_string()) \
.option("eventhubs.consumerGroup", "$Default") \
.option("maxEventsPerTrigger", 10000) \
.load()
# Parse JSON events
events_df = df.select(
from_json(col("body").cast("string"), self._get_schema()).alias("event")
).select("event.*")
# Compute real-time metrics with watermarking
metrics_df = events_df \
.withWatermark("eventTimestamp", "2 minutes") \
.groupBy(
"deviceId",
window("eventTimestamp", "1 minute")
) \
.agg(
count("*").alias("eventCount"),
avg("temperature").alias("avgTemperature"),
max("temperature").alias("maxTemperature"),
min("temperature").alias("minTemperature"),
sum("energyConsumed").alias("totalEnergy")
) \
.select(
col("deviceId"),
col("window.start").alias("windowStart"),
col("window.end").alias("windowEnd"),
col("eventCount"),
col("avgTemperature"),
col("maxTemperature"),
col("minTemperature"),
col("totalEnergy"),
lit("SPEED_LAYER").alias("source"),
current_timestamp().alias("processedAt")
)
# Write to Cosmos DB
query = metrics_df.writeStream \
.outputMode("append") \
.format("cosmos.oltp") \
.option("spark.synapse.linkedService", "CosmosDBLinkedService") \
.option("spark.cosmos.container", "speed_views") \
.option("checkpointLocation", f"{self.checkpoint_path}/metrics") \
.start()
return query
def _get_schema(self):
"""Define event schema"""
return StructType([
StructField("deviceId", StringType(), False),
StructField("eventTimestamp", TimestampType(), False),
StructField("temperature", DoubleType(), True),
StructField("energyConsumed", DoubleType(), True),
StructField("userId", StringType(), True),
StructField("eventType", StringType(), True)
])
def _get_connection_string(self):
"""Get Event Hubs connection string from Key Vault"""
# In production, use Azure Key Vault
return "Endpoint=sb://..."
3. Serving Layer¶
The serving layer merges results from batch and speed layers to provide unified, low-latency query responses.
Query Merging Strategies¶
graph TB
subgraph "Query Processing"
Q[Query Request] --> Router{Time Range?}
Router -->|Historical Only| Batch[Batch Views]
Router -->|Recent Only| Speed[Speed Views]
Router -->|Hybrid| Merge[Merge Logic]
Merge --> BatchPart[Batch: Historical Data]
Merge --> SpeedPart[Speed: Recent Data]
BatchPart --> Combine[Combine Results]
SpeedPart --> Combine
Batch --> Result[Query Result]
Speed --> Result
Combine --> Result
end
style Merge fill:#f3e5f5
style Combine fill:#e8f5e9 Implementation Patterns¶
Pattern 1: Time-Based Partitioning¶
# Serving Layer - Query Merger
class ServingLayer:
"""Unified serving layer combining batch and speed views"""
def __init__(self, synapse_connection, cosmos_connection):
self.synapse = synapse_connection
self.cosmos = cosmos_connection
self.batch_lag_hours = 4 # Batch views lag by 4 hours
def query_device_metrics(self, device_id: str, start_time: datetime, end_time: datetime):
"""
Query device metrics combining batch and speed layers
Args:
device_id: Device identifier
start_time: Query start time
end_time: Query end time
Returns:
Combined results from both layers
"""
current_time = datetime.utcnow()
batch_cutoff = current_time - timedelta(hours=self.batch_lag_hours)
results = []
# Query batch layer for historical data
if start_time < batch_cutoff:
batch_end = min(end_time, batch_cutoff)
batch_results = self._query_batch_layer(device_id, start_time, batch_end)
results.extend(batch_results)
# Query speed layer for recent data
if end_time > batch_cutoff:
speed_start = max(start_time, batch_cutoff)
speed_results = self._query_speed_layer(device_id, speed_start, end_time)
results.extend(speed_results)
# Merge and deduplicate if there's overlap
return self._merge_results(results)
def _query_batch_layer(self, device_id: str, start_time: datetime, end_time: datetime):
"""Query Synapse SQL for batch views"""
query = """
SELECT
device_id,
timestamp,
event_count,
avg_temperature,
max_temperature,
min_temperature,
total_energy,
'BATCH' as source
FROM batch_views.device_metrics
WHERE device_id = @device_id
AND timestamp >= @start_time
AND timestamp <= @end_time
ORDER BY timestamp
"""
return self.synapse.execute(query, {
'device_id': device_id,
'start_time': start_time,
'end_time': end_time
})
def _query_speed_layer(self, device_id: str, start_time: datetime, end_time: datetime):
"""Query Cosmos DB for speed views"""
query = {
"query": """
SELECT
c.deviceId as device_id,
c.windowEnd as timestamp,
c.eventCount as event_count,
c.avgTemperature as avg_temperature,
c.maxTemperature as max_temperature,
c.minTemperature as min_temperature,
c.totalEnergy as total_energy,
'SPEED' as source
FROM c
WHERE c.deviceId = @device_id
AND c.windowEnd >= @start_time
AND c.windowEnd <= @end_time
ORDER BY c.windowEnd
""",
"parameters": [
{"name": "@device_id", "value": device_id},
{"name": "@start_time", "value": start_time.isoformat()},
{"name": "@end_time", "value": end_time.isoformat()}
]
}
return self.cosmos.query_items(query, partition_key=device_id)
def _merge_results(self, results: list):
"""Merge results from batch and speed layers"""
# Sort by timestamp
sorted_results = sorted(results, key=lambda x: x['timestamp'])
# Deduplicate if there's overlap (prefer batch layer for accuracy)
deduplicated = {}
for record in sorted_results:
key = f"{record['device_id']}_{record['timestamp']}"
if key not in deduplicated or record['source'] == 'BATCH':
deduplicated[key] = record
return list(deduplicated.values())
Pattern 2: Materialized Views¶
-- Create unified view in Synapse SQL
CREATE VIEW unified_device_metrics
AS
SELECT
device_id,
timestamp,
event_count,
avg_temperature,
max_temperature,
min_temperature,
total_energy,
source
FROM (
-- Batch layer (historical, complete)
SELECT
device_id,
timestamp,
event_count,
avg_temperature,
max_temperature,
min_temperature,
total_energy,
'BATCH' as source
FROM batch_views.device_metrics
WHERE timestamp < DATEADD(hour, -4, GETUTCDATE())
UNION ALL
-- Speed layer (recent, approximate)
SELECT
deviceId as device_id,
windowEnd as timestamp,
eventCount as event_count,
avgTemperature as avg_temperature,
maxTemperature as max_temperature,
minTemperature as min_temperature,
totalEnergy as total_energy,
'SPEED' as source
FROM OPENROWSET(
'CosmosDB',
'Account=...;Database=...;',
speed_views
) WITH (
deviceId varchar(50),
windowEnd datetime2,
eventCount bigint,
avgTemperature float,
maxTemperature float,
minTemperature float,
totalEnergy float
) AS speed
WHERE windowEnd >= DATEADD(hour, -4, GETUTCDATE())
) AS combined;
Azure Implementation¶
Reference Architecture¶
graph TB
subgraph "Ingestion"
Sources[Data Sources] --> EH[Event Hubs<br/>Standard/Premium]
end
subgraph "Batch Processing"
EH --> Capture[Event Hubs Capture<br/>Avro Files]
Capture --> ADLS[Data Lake Gen2<br/>Bronze/Silver/Gold]
ADLS --> ADF[Data Factory<br/>Orchestration]
ADF --> Spark[Synapse Spark Pools<br/>Batch Compute]
Spark --> DeltaBatch[Delta Lake<br/>Batch Views]
end
subgraph "Stream Processing"
EH --> ASA[Stream Analytics<br/>OR<br/>Databricks Streaming]
ASA --> CosmosSpeed[Cosmos DB<br/>Speed Views]
ASA --> Redis[Azure Cache<br/>Hot Data]
end
subgraph "Serving & Analytics"
DeltaBatch --> Synapse[Synapse SQL Pools<br/>Unified Queries]
CosmosSpeed --> SynapseLink[Synapse Link]
SynapseLink --> Synapse
Synapse --> PBI[Power BI<br/>Dashboards]
Synapse --> API[API Apps<br/>Applications]
end
subgraph "Governance & Monitoring"
ADLS -.-> Purview[Azure Purview<br/>Data Catalog]
Spark -.-> Monitor[Azure Monitor<br/>Metrics & Logs]
ASA -.-> Monitor
Synapse -.-> Monitor
end
style EH fill:#fff3e0
style ADLS fill:#e1f5fe
style Spark fill:#f3e5f5
style ASA fill:#f3e5f5
style Synapse fill:#e8f5e9 Azure Service Configuration¶
Event Hubs Configuration¶
{
"name": "lambda-eventhub",
"sku": "Standard",
"properties": {
"partitionCount": 16,
"messageRetentionInDays": 7,
"captureDescription": {
"enabled": true,
"encoding": "Avro",
"intervalInSeconds": 300,
"sizeLimitInBytes": 314572800,
"destination": {
"name": "EventHubArchive.AzureDataLake",
"properties": {
"storageAccountResourceId": "/subscriptions/.../resourceGroups/.../providers/Microsoft.Storage/storageAccounts/datalake",
"blobContainer": "raw-events",
"archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
}
}
Synapse Spark Pool Configuration¶
{
"name": "batch-processing-pool",
"properties": {
"nodeSize": "Large",
"nodeSizeFamily": "MemoryOptimized",
"autoScale": {
"enabled": true,
"minNodeCount": 3,
"maxNodeCount": 20
},
"autoPause": {
"enabled": true,
"delayInMinutes": 15
},
"sparkVersion": "3.3",
"sparkConfig": {
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "3",
"spark.dynamicAllocation.maxExecutors": "20",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
}
Stream Analytics Job Configuration¶
{
"name": "speed-layer-job",
"properties": {
"sku": {
"name": "Standard"
},
"eventsOutOfOrderPolicy": "Adjust",
"outputErrorPolicy": "Drop",
"eventsOutOfOrderMaxDelayInSeconds": 10,
"eventsLateArrivalMaxDelayInSeconds": 5,
"compatibilityLevel": "1.2",
"transformation": {
"name": "Transformation",
"properties": {
"streamingUnits": 6,
"query": "..."
}
}
}
}
Design Patterns¶
Pattern 1: Incremental Batch Processing¶
# Incremental batch processing to reduce recomputation
class IncrementalBatchProcessor:
"""Process only new/changed data in batch layer"""
def __init__(self, spark):
self.spark = spark
self.checkpoint_table = "batch_checkpoints.processing_watermarks"
def process_incremental_batch(self, view_name: str):
"""Process only data since last successful batch"""
# Get last processed timestamp
last_processed = self._get_last_watermark(view_name)
# Read only new data
new_data = self.spark.read.format("delta") \
.load("abfss://master@datalake.dfs.core.windows.net/raw") \
.filter(f"event_timestamp > '{last_processed}'")
# Process new data
processed_view = self._process_data(new_data, view_name)
# Merge with existing view (upsert)
self._merge_with_existing(processed_view, view_name)
# Update watermark
self._update_watermark(view_name, new_data.agg(max("event_timestamp")).first()[0])
def _merge_with_existing(self, new_data, view_name):
"""Merge new batch results with existing using Delta merge"""
from delta.tables import DeltaTable
target_path = f"abfss://serving@datalake.dfs.core.windows.net/batch/{view_name}"
# Check if target exists
if DeltaTable.isDeltaTable(self.spark, target_path):
target = DeltaTable.forPath(self.spark, target_path)
# Merge (upsert) logic
target.alias("target").merge(
new_data.alias("source"),
"target.device_id = source.device_id AND target.timestamp = source.timestamp"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
# First run - just write
new_data.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save(target_path)
Pattern 2: Speed Layer Compaction¶
# Compact speed layer data before archiving to batch layer
class SpeedLayerCompactor:
"""Compact and clean speed layer data"""
def __init__(self, cosmos_client):
self.cosmos = cosmos_client
self.speed_container = self.cosmos.get_database("speed_layer") \
.get_container("metrics")
def compact_to_batch(self, cutoff_time: datetime):
"""
Compact speed layer data older than cutoff into batch layer
Then delete from speed layer to control costs
"""
# Query old speed layer data
query = """
SELECT * FROM c
WHERE c.windowEnd < @cutoff_time
ORDER BY c.windowEnd
"""
old_data = list(self.speed_container.query_items(
query=query,
parameters=[{"name": "@cutoff_time", "value": cutoff_time.isoformat()}],
enable_cross_partition_query=True
))
if old_data:
# Write to batch layer (append to existing)
self._append_to_batch_layer(old_data)
# Delete from speed layer
for item in old_data:
self.speed_container.delete_item(
item=item['id'],
partition_key=item['deviceId']
)
print(f"Compacted {len(old_data)} records from speed to batch layer")
def _append_to_batch_layer(self, data: list):
"""Append compacted speed data to batch layer"""
# Convert to DataFrame and write to Delta Lake
# This ensures data is preserved before deletion
pass
Pattern 3: Smart Query Routing¶
# Route queries intelligently based on requirements
class QueryRouter:
"""Route queries to optimal layer based on requirements"""
def route_query(self, query_spec: dict):
"""
Route query based on characteristics:
- Accuracy requirements
- Latency requirements
- Time range
- Data freshness
"""
accuracy_required = query_spec.get('accuracy', 'approximate')
max_latency_ms = query_spec.get('max_latency_ms', 1000)
time_range = query_spec.get('time_range')
# High accuracy required -> prefer batch layer
if accuracy_required == 'exact':
if self._is_data_in_batch(time_range):
return self._query_batch_layer(query_spec)
else:
# Data not yet in batch, wait or query speed
return self._wait_or_speed(query_spec, max_latency_ms)
# Low latency required -> prefer speed layer
elif max_latency_ms < 100:
return self._query_speed_layer(query_spec)
# Balanced -> use serving layer merger
else:
return self._query_serving_layer(query_spec)
Performance Optimization¶
Batch Layer Optimization¶
Delta Lake Optimizations¶
# Optimize Delta tables for query performance
from delta.tables import DeltaTable
def optimize_batch_views(spark, view_path: str, partition_cols: list):
"""
Optimize Delta Lake tables with:
- Compaction (merge small files)
- Z-Ordering (multi-dimensional clustering)
- Vacuum (remove old versions)
"""
delta_table = DeltaTable.forPath(spark, view_path)
# Compact small files
delta_table.optimize().executeCompaction()
# Z-Order by commonly filtered columns
delta_table.optimize().executeZOrderBy(*partition_cols)
# Vacuum old versions (keep 7 days for time travel)
delta_table.vacuum(retentionHours=168)
# Update statistics
spark.sql(f"ANALYZE TABLE delta.`{view_path}` COMPUTE STATISTICS FOR ALL COLUMNS")
Partitioning Strategy¶
# Optimal partitioning for batch views
partitioning_strategies = {
"daily_metrics": {
"partition_by": ["year", "month", "day"],
"bucket_by": "device_id",
"buckets": 100,
"z_order_by": ["device_id", "timestamp"]
},
"hourly_metrics": {
"partition_by": ["date", "hour"],
"z_order_by": ["device_id", "timestamp"]
},
"user_aggregates": {
"partition_by": ["date"],
"bucket_by": "user_id",
"buckets": 200,
"z_order_by": ["user_id", "event_type"]
}
}
Speed Layer Optimization¶
Stream Analytics Performance¶
-- Optimize Stream Analytics with appropriate windowing
-- Use tumbling windows for non-overlapping aggregations
SELECT
deviceId,
System.Timestamp() AS WindowEnd,
AVG(temperature) AS avgTemp
FROM
Input TIMESTAMP BY eventTime
GROUP BY
deviceId,
TumblingWindow(minute, 1) -- Non-overlapping, efficient
-- Use hopping windows for sliding aggregations
SELECT
deviceId,
System.Timestamp() AS WindowEnd,
AVG(temperature) AS avgTemp
FROM
Input TIMESTAMP BY eventTime
GROUP BY
deviceId,
HoppingWindow(minute, 5, 1) -- 5-min window, 1-min hop
-- Use session windows for event bursts
SELECT
deviceId,
System.Timestamp() AS WindowEnd,
COUNT(*) AS eventCount
FROM
Input TIMESTAMP BY eventTime
GROUP BY
deviceId,
SessionWindow(minute, 5, 10) -- 5-min timeout, 10-min max
Cosmos DB Optimization¶
# Optimize Cosmos DB for speed layer
cosmos_optimization_config = {
"partition_key": "/deviceId", # Distribute load evenly
"indexing_policy": {
"indexingMode": "consistent",
"automatic": True,
"includedPaths": [
{"path": "/deviceId/?"},
{"path": "/windowEnd/?"},
{"path": "/avgTemperature/?"}
],
"excludedPaths": [
{"path": "/*"} # Exclude everything else
]
},
"default_ttl": 86400 * 7, # 7 days TTL (speed layer is temporary)
"throughput": {
"autoscale": True,
"max_throughput": 10000 # Scale up to 10K RU/s
}
}
Monitoring & Operations¶
Key Metrics¶
Batch Layer Metrics¶
# Metrics to monitor for batch layer health
batch_layer_metrics = {
"processing": {
"batch_job_duration_minutes": {
"threshold": 240, # Alert if > 4 hours
"aggregation": "avg"
},
"batch_job_success_rate": {
"threshold": 0.95, # Alert if < 95%
"aggregation": "avg"
},
"records_processed_per_batch": {
"threshold": 1000000, # Alert if < 1M (may indicate issues)
"aggregation": "sum"
}
},
"data_quality": {
"data_freshness_hours": {
"threshold": 6, # Alert if > 6 hours old
"aggregation": "max"
},
"null_value_percentage": {
"threshold": 0.05, # Alert if > 5% nulls
"aggregation": "avg"
},
"duplicate_records_percentage": {
"threshold": 0.01, # Alert if > 1% duplicates
"aggregation": "avg"
}
},
"storage": {
"delta_table_size_gb": {
"threshold": 5000, # Alert if > 5TB
"aggregation": "sum"
},
"small_files_count": {
"threshold": 10000, # Alert if > 10K small files
"aggregation": "count"
}
}
}
Speed Layer Metrics¶
# Metrics to monitor for speed layer health
speed_layer_metrics = {
"latency": {
"end_to_end_latency_ms": {
"threshold": 5000, # Alert if > 5 seconds
"aggregation": "p95"
},
"processing_latency_ms": {
"threshold": 1000, # Alert if > 1 second
"aggregation": "p95"
}
},
"throughput": {
"events_per_second": {
"threshold": 10000, # Alert if > capacity
"aggregation": "max"
},
"backlog_events": {
"threshold": 100000, # Alert if backlog grows
"aggregation": "max"
}
},
"quality": {
"dropped_events_percentage": {
"threshold": 0.01, # Alert if > 1% dropped
"aggregation": "avg"
},
"out_of_order_events_percentage": {
"threshold": 0.05, # Alert if > 5% out of order
"aggregation": "avg"
}
}
}
Monitoring Dashboard¶
graph TB
subgraph "Lambda Architecture Monitoring"
subgraph "Batch Layer Monitoring"
BM1[Job Success Rate]
BM2[Processing Duration]
BM3[Data Freshness]
BM4[Data Quality]
end
subgraph "Speed Layer Monitoring"
SM1[End-to-End Latency]
SM2[Events/Second]
SM3[Backlog Size]
SM4[Error Rate]
end
subgraph "Serving Layer Monitoring"
SVM1[Query Latency]
SVM2[Cache Hit Rate]
SVM3[Merge Conflicts]
end
subgraph "Alerts"
BM1 --> Alert[Azure Monitor Alerts]
BM2 --> Alert
SM1 --> Alert
SM3 --> Alert
SVM1 --> Alert
end
end Best Practices¶
Data Immutability¶
# Always preserve raw data - never modify
class DataImmutabilityPattern:
"""Ensure data immutability in batch layer"""
def ingest_data(self, source_data):
"""
Ingest data preserving original format
Store metadata about ingestion
"""
raw_data_with_metadata = source_data.withColumn(
"ingestion_metadata",
struct(
current_timestamp().alias("ingested_at"),
lit("event_hub").alias("source_system"),
input_file_name().alias("source_file"),
lit("v1").alias("schema_version")
)
)
# Write as immutable (append-only)
raw_data_with_metadata.write \
.format("delta") \
.mode("append") \
.partitionBy("ingestion_date") \
.save("abfss://master@datalake.dfs.core.windows.net/raw")
# Never use mode("overwrite") on master dataset
Recomputation Capability¶
# Maintain ability to recompute views from master dataset
class RecomputationPattern:
"""Enable recomputation for human fault-tolerance"""
def recompute_view(self, view_name: str, reason: str):
"""
Recompute entire view from master dataset
Useful when logic changes or errors discovered
"""
print(f"Recomputing {view_name}")
print(f"Reason: {reason}")
# Archive old view
self._archive_old_view(view_name)
# Reprocess from master dataset
master_data = self.spark.read.format("delta") \
.load("abfss://master@datalake.dfs.core.windows.net/raw")
# Apply current logic
new_view = self._apply_current_logic(master_data, view_name)
# Replace old view
new_view.write \
.format("delta") \
.mode("overwrite") \
.save(f"abfss://serving@datalake.dfs.core.windows.net/batch/{view_name}")
print(f"Recomputation complete for {view_name}")
Idempotent Processing¶
# Ensure batch jobs are idempotent
class IdempotentProcessing:
"""Batch jobs should produce same result when run multiple times"""
def process_batch(self, batch_date: str):
"""
Process batch for specific date
Can be run multiple times safely
"""
# Use deterministic partitioning
partition_filter = f"event_date = '{batch_date}'"
# Read data for specific partition only
data = self.spark.read.format("delta") \
.load("abfss://master@datalake.dfs.core.windows.net/raw") \
.filter(partition_filter)
# Process
result = self._process(data)
# Overwrite specific partition only (idempotent)
result.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", partition_filter) \
.save("abfss://serving@datalake.dfs.core.windows.net/batch/metrics")
Common Challenges¶
Challenge 1: Managing Dual Codebases¶
Problem: Maintaining separate code for batch and stream processing
Solution:
# Abstract common logic into shared modules
class SharedMetricsLogic:
"""Business logic shared between batch and speed layers"""
@staticmethod
def calculate_device_metrics(df, group_cols):
"""
Same metric calculation for both batch and stream
Use this in both Spark batch and Structured Streaming
"""
from pyspark.sql.functions import avg, max, min, count, sum
return df.groupBy(*group_cols).agg(
count("*").alias("event_count"),
avg("temperature").alias("avg_temperature"),
max("temperature").alias("max_temperature"),
min("temperature").alias("min_temperature"),
sum("energy_consumed").alias("total_energy")
)
# Use in batch layer
batch_metrics = SharedMetricsLogic.calculate_device_metrics(
batch_df,
["device_id", "event_date"]
)
# Use in speed layer (streaming)
speed_metrics = SharedMetricsLogic.calculate_device_metrics(
streaming_df,
["device_id", window("event_timestamp", "1 minute")]
)
Challenge 2: Query Complexity¶
Problem: Complex query routing and merging logic
Solution: Use materialized views in Synapse SQL
-- Simplified querying with materialized view
CREATE MATERIALIZED VIEW mv_unified_metrics
WITH (DISTRIBUTION = HASH(device_id))
AS
SELECT * FROM unified_device_metrics;
-- Queries are simple now
SELECT * FROM mv_unified_metrics
WHERE device_id = 'device-123'
AND timestamp >= DATEADD(day, -7, GETUTCDATE());
Challenge 3: Eventual Consistency¶
Problem: Speed and batch layers may have different values for same data
Solution: Document and communicate consistency guarantees
class ConsistencyGuarantees:
"""Document consistency model for applications"""
CONSISTENCY_MODEL = {
"real_time_queries": {
"max_staleness_seconds": 10,
"accuracy": "approximate (±5%)",
"source": "speed_layer"
},
"historical_queries": {
"max_staleness_hours": 4,
"accuracy": "exact (100%)",
"source": "batch_layer"
},
"recent_queries": {
"max_staleness_minutes": 5,
"accuracy": "converging",
"source": "merged_layers",
"note": "Batch layer takes precedence when available"
}
}
Use Cases¶
Use Case 1: IoT Device Monitoring¶
graph TB
Devices[IoT Devices<br/>100K+ devices] --> EH[Event Hubs<br/>High Throughput]
subgraph "Batch Layer - Daily Reports"
EH --> Capture[Capture to ADLS]
Capture --> Spark[Spark: Daily Aggregations]
Spark --> Reports[Daily/Weekly/Monthly Reports]
end
subgraph "Speed Layer - Real-time Alerts"
EH --> ASA[Stream Analytics]
ASA --> Alerts[Anomaly Detection]
ASA --> Cosmos[Cosmos: Last 24h Metrics]
end
Reports --> Dashboard[Power BI Dashboard]
Cosmos --> Dashboard
Alerts --> Teams[Teams Notifications] Use Case 2: E-commerce Analytics¶
graph TB
Web[Website Events] --> EH[Event Hubs]
Mobile[Mobile App Events] --> EH
subgraph "Batch - Business Intelligence"
EH --> DL[Data Lake<br/>All Events]
DL --> Batch[Batch Processing]
Batch --> DW[Data Warehouse<br/>Customer 360]
end
subgraph "Speed - Real-time Personalization"
EH --> Stream[Stream Processing]
Stream --> Redis[Redis Cache<br/>User Profiles]
Stream --> Cosmos[Cosmos<br/>Recent Activity]
end
DW --> BI[BI Reports]
Redis --> WebApp[Website]
Cosmos --> WebApp Use Case 3: Financial Transaction Processing¶
graph TB
ATM[ATM Transactions] --> EH[Event Hubs]
POS[POS Transactions] --> EH
Online[Online Banking] --> EH
subgraph "Batch - Compliance & Reporting"
EH --> Archive[Immutable Archive]
Archive --> Compliance[Compliance Reports]
Archive --> Audit[Audit Trails]
end
subgraph "Speed - Fraud Detection"
EH --> Fraud[Real-time Fraud Check]
Fraud --> Block[Block Suspicious]
Fraud --> Cosmos[Transaction History<br/>90 days]
end
Compliance --> Regulators[Regulatory Bodies]
Block --> Alerts[Alert Security Team] Additional Resources¶
Documentation¶
- Azure Event Hubs Documentation
- Azure Stream Analytics Documentation
- Azure Synapse Analytics Documentation
- Delta Lake Documentation
Code Examples¶
Refer to the implementation examples provided throughout this guide for: - Lambda Architecture patterns and configurations - Event Hubs Capture setup and processing - Stream Analytics query optimization
Related Patterns¶
- Kappa Architecture - Stream-only alternative
- Event Sourcing - Event-driven complement
- CQRS Pattern - Read/write optimization
Last Updated: 2025-01-28 Pattern Status: Production Ready Complexity Level: Advanced