Capture Optimization¶
Home | Best Practices | Cross-Cutting | Capture Optimization
Best practices for Event Hubs Capture and data ingestion optimization.
Overview¶
Event Hubs Capture automatically delivers streaming data to Azure Blob Storage or Azure Data Lake Storage, enabling seamless integration with batch analytics.
Capture Configuration¶
Optimal Settings¶
| Parameter | Recommended | Notes |
|---|---|---|
| Size window | 100-300 MB | Balance between file size and latency |
| Time window | 5-15 minutes | Adjust based on throughput |
| File format | Avro | Native format, best performance |
| Encoding | Avro with Snappy | Good compression ratio |
Terraform Configuration¶
resource "azurerm_eventhub" "capture" {
name = "eh-iot-telemetry"
namespace_name = azurerm_eventhub_namespace.main.name
resource_group_name = var.resource_group_name
partition_count = 32
message_retention = 7
capture_description {
enabled = true
encoding = "Avro"
interval_in_seconds = 300 # 5 minutes
size_limit_in_bytes = 314572800 # 300 MB
skip_empty_archives = true
destination {
name = "EventHubArchive.AzureBlockBlob"
archive_name_format = "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
blob_container_name = "capture"
storage_account_id = azurerm_storage_account.capture.id
}
}
}
File Organization¶
Partition Strategy¶
```textdatalake/ └── capture/ └── eventhub-ns/ └── eh-telemetry/ ├── 0/ # Partition 0 │ └── 2024/01/15/10/00/00/ │ └── 0_abc123.avro ├── 1/ # Partition 1 │ └── 2024/01/15/10/00/00/ │ └── 1_def456.avro └── ...
### Custom Path Format
```hcl
# Optimized for Delta Lake processing
archive_name_format = "bronze/iot/{EventHub}/year={Year}/month={Month}/day={Day}/hour={Hour}/{PartitionId}_{Second}"
Processing Captured Data¶
Auto Loader Integration¶
# Read captured Avro files with Auto Loader
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "avro")
.option("cloudFiles.schemaLocation", "/checkpoints/iot/schema")
.option("cloudFiles.includeExistingFiles", "true")
.load("abfss://capture@datalake.dfs.core.windows.net/bronze/iot/"))
# Process and write to Delta
(df.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoints/iot/delta")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.toTable("bronze.iot_telemetry"))
Batch Processing¶
# Process captured files in batch
from pyspark.sql.functions import input_file_name, col
def process_captured_files(date: str):
"""Process captured files for a specific date."""
path = f"abfss://capture@datalake.dfs.core.windows.net/bronze/iot/*/year={date[:4]}/month={date[5:7]}/day={date[8:10]}/"
df = (spark.read
.format("avro")
.load(path)
.withColumn("source_file", input_file_name())
.withColumn("partition_id",
regexp_extract(col("source_file"), r"/(\d+)_", 1)))
return df
Performance Optimization¶
Throughput Scaling¶
# Calculate optimal partition count
def calculate_partitions(
events_per_second: int,
event_size_kb: float,
capture_interval_minutes: int = 5,
target_file_size_mb: int = 128
) -> int:
"""Calculate partition count for optimal file sizes."""
data_per_interval_mb = (events_per_second * event_size_kb * 60 * capture_interval_minutes) / 1024
partitions = max(1, int(data_per_interval_mb / target_file_size_mb))
# Round up to power of 2
return 2 ** (partitions - 1).bit_length()
# Example: 10K events/sec, 1KB each, 5 min capture
partitions = calculate_partitions(10000, 1.0, 5, 128) # Returns 32
File Compaction¶
# Compact small capture files
def compact_capture_files(source_path: str, target_path: str, target_size_mb: int = 128):
"""Compact small Avro files into larger Parquet files."""
df = spark.read.format("avro").load(source_path)
# Calculate optimal partition count
total_size = df.rdd.map(lambda x: len(str(x))).sum() / (1024 * 1024)
num_files = max(1, int(total_size / target_size_mb))
(df.repartition(num_files)
.write
.format("parquet")
.mode("overwrite")
.save(target_path))
Monitoring¶
Capture Metrics¶
// Monitor capture throughput
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.EVENTHUB"
| where Category == "ArchiveLogs"
| summarize
CapturedMessages = sum(toint(archiveMessageCount_d)),
CapturedBytes = sum(toint(archiveSize_d)),
FileCount = count()
by bin(TimeGenerated, 1h), EventHub = Resource
| order by TimeGenerated desc
Alert Configuration¶
{
"type": "Microsoft.Insights/metricAlerts",
"properties": {
"severity": 2,
"criteria": {
"odata.type": "Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria",
"allOf": [
{
"name": "CaptureBytesBacklog",
"metricName": "CaptureBacklog",
"operator": "GreaterThan",
"threshold": 1073741824,
"timeAggregation": "Average"
}
]
}
}
}
Cost Optimization¶
Storage Tiers¶
# Move old capture files to cool storage
def apply_lifecycle_policy():
"""Configure lifecycle management for capture storage."""
policy = {
"rules": [
{
"name": "move-capture-to-cool",
"enabled": True,
"type": "Lifecycle",
"definition": {
"filters": {
"blobTypes": ["blockBlob"],
"prefixMatch": ["capture/"]
},
"actions": {
"baseBlob": {
"tierToCool": {"daysAfterModificationGreaterThan": 7},
"tierToArchive": {"daysAfterModificationGreaterThan": 30},
"delete": {"daysAfterModificationGreaterThan": 365}
}
}
}
}
]
}
return policy
Compression Analysis¶
| Format | Compression | Size Reduction | Read Performance |
|---|---|---|---|
| Avro (none) | None | 0% | Fast |
| Avro (snappy) | Snappy | 40-60% | Fast |
| Avro (deflate) | Deflate | 60-70% | Medium |
| Parquet (snappy) | Snappy | 70-80% | Fast |
Related Documentation¶
Last Updated: January 2025