⚡ Tutorial 5: Real-time Streaming Ingestion¶
Build real-time data ingestion pipelines using Azure Event Hubs and Spark Structured Streaming. Process streaming data with low latency and integrate with batch processing layers.
🎯 Learning Objectives¶
After completing this tutorial, you will be able to:
- ✅ Set up Azure Event Hubs for streaming data ingestion
- ✅ Configure Kafka integration for high-throughput scenarios
- ✅ Build Spark Structured Streaming applications
- ✅ Implement stream processing with windowing and aggregations
- ✅ Write to Delta Lake for unified batch and streaming
⏱️ Time Estimate: 30 minutes¶
- Event Hub Setup: 10 minutes
- Spark Streaming Application: 15 minutes
- Testing & Monitoring: 5 minutes
📋 Prerequisites¶
Completed Tutorials¶
- Tutorial 1: Environment Setup
- Tutorial 2: Workspace Basics
- Tutorial 3: Data Lake Setup
- Tutorial 4: Batch Ingestion
Required Resources¶
- Synapse workspace with Spark pool
- ADLS Gen2 storage configured
- Azure subscription with Event Hubs quota
Verify Prerequisites¶
# Load workspace configuration
$config = Get-Content "workspace-config.json" | ConvertFrom-Json
# Verify Spark pool exists
az synapse spark pool show `
--name "sparksmall" `
--workspace-name $config.WorkspaceName `
--resource-group $config.ResourceGroup
🔧 Step 1: Create Event Hub Namespace¶
1.1 Provision Event Hub Namespace¶
# Load naming convention
$naming = Get-Content "naming-convention.json" | ConvertFrom-Json
# Create Event Hub namespace
$eventHubNamespace = "eh-$($naming.SynapseWorkspace)-ns"
az eventhubs namespace create `
--name $eventHubNamespace `
--resource-group $naming.ResourceGroupName `
--location $naming.Location `
--sku Standard `
--enable-kafka true `
--enable-auto-inflate false `
--tags Project=SynapseTutorial Environment=Learning
Write-Host "✅ Event Hub namespace created: $eventHubNamespace" -ForegroundColor Green
1.2 Create Event Hub for Transactions¶
# Create Event Hub for transaction stream
az eventhubs eventhub create `
--namespace-name $eventHubNamespace `
--resource-group $naming.ResourceGroupName `
--name "transactions-stream" `
--partition-count 4 `
--message-retention 1
Write-Host "✅ Event Hub created: transactions-stream" -ForegroundColor Green
1.3 Configure Access Policy¶
# Create SAS policy for sending data
az eventhubs eventhub authorization-rule create `
--namespace-name $eventHubNamespace `
--resource-group $naming.ResourceGroupName `
--eventhub-name "transactions-stream" `
--name "SendPolicy" `
--rights Send
# Get connection string
$connectionString = az eventhubs eventhub authorization-rule keys list `
--namespace-name $eventHubNamespace `
--resource-group $naming.ResourceGroupName `
--eventhub-name "transactions-stream" `
--name "SendPolicy" `
--query primaryConnectionString `
--output tsv
Write-Host "✅ Connection string retrieved (store securely)" -ForegroundColor Green
📨 Step 2: Send Test Data to Event Hub¶
2.1 Create Data Generator Script¶
# File: event_hub_producer.py
import asyncio
import json
from datetime import datetime
import random
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
# Configuration
CONNECTION_STR = "your-connection-string-here"
EVENTHUB_NAME = "transactions-stream"
async def send_transaction_events():
"""Generate and send transaction events to Event Hub"""
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
customer_ids = [f"C{str(i).zfill(3)}" for i in range(1, 101)]
categories = ["Electronics", "Books", "Clothing", "Home", "Sports"]
async with producer:
# Generate 100 transactions
event_data_batch = await producer.create_batch()
for i in range(100):
transaction = {
"transaction_id": f"T{str(i+1).zfill(6)}",
"customer_id": random.choice(customer_ids),
"amount": round(random.uniform(10.0, 500.0), 2),
"category": random.choice(categories),
"timestamp": datetime.utcnow().isoformat(),
"location": random.choice(["US", "UK", "EU", "APAC"]),
"payment_method": random.choice(["Credit", "Debit", "PayPal"])
}
event_data_batch.add(EventData(json.dumps(transaction)))
if len(event_data_batch) == 10:
await producer.send_batch(event_data_batch)
print(f"Sent batch of {len(event_data_batch)} events")
event_data_batch = await producer.create_batch()
# Send remaining events
if len(event_data_batch) > 0:
await producer.send_batch(event_data_batch)
print(f"Sent final batch of {len(event_data_batch)} events")
print(f"✅ Successfully sent 100 transaction events")
# Run the producer
if __name__ == "__main__":
asyncio.run(send_transaction_events())
2.2 Execute Data Generator¶
# Install required Python packages
pip install azure-eventhub aiohttp
# Update connection string in script
(Get-Content "event_hub_producer.py") -replace "your-connection-string-here", $connectionString | Set-Content "event_hub_producer.py"
# Run producer
python event_hub_producer.py
Write-Host "✅ Test data sent to Event Hub" -ForegroundColor Green
⚡ Step 3: Create Spark Structured Streaming Application¶
3.1 Create Streaming Notebook¶
Via Synapse Studio:
1. Navigate to Develop → + → Notebook
2. Name: "StreamingIngestion"
3. Attach to: sparksmall (Spark pool)
4. Language: PySpark
3.2 Configure Event Hub Connection¶
# Cell 1: Import libraries and configure Event Hub connection
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Event Hub configuration
EVENT_HUB_NAMESPACE = "eh-synapse-ns"
EVENT_HUB_NAME = "transactions-stream"
CONNECTION_STRING = "your-connection-string-here"
# Event Hub connection configuration
ehConf = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONNECTION_STRING),
"eventhubs.consumerGroup": "$Default",
"eventhubs.startingPosition": json.dumps({"offset": "-1", "seqNo": -1, "enqueuedTime": None, "isInclusive": True})
}
print("✅ Event Hub configuration loaded")
3.3 Read Streaming Data¶
# Cell 2: Create streaming DataFrame
# Read from Event Hub
streaming_df = (spark
.readStream
.format("eventhubs")
.options(**ehConf)
.load()
)
# Display schema
streaming_df.printSchema()
# Parse JSON body
transaction_schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("category", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("location", StringType(), True),
StructField("payment_method", StringType(), True)
])
# Extract and parse transaction data
transactions = (streaming_df
.withColumn("body_string", col("body").cast("string"))
.withColumn("transaction", from_json(col("body_string"), transaction_schema))
.select(
col("transaction.*"),
col("enqueuedTime").alias("ingestion_time"),
col("offset"),
col("sequenceNumber")
)
)
print("✅ Streaming DataFrame created")
3.4 Apply Transformations¶
# Cell 3: Transform streaming data
# Add processing timestamp
enriched_transactions = (transactions
.withColumn("processing_time", current_timestamp())
.withColumn("processing_date", to_date(col("timestamp")))
.withColumn("amount_usd", col("amount")) # Assume USD, would convert in production
)
# Calculate running statistics
transaction_stats = (enriched_transactions
.groupBy(
window(col("timestamp"), "5 minutes"),
col("category")
)
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
max("amount").alias("max_amount")
)
)
print("✅ Transformations applied")
💾 Step 4: Write to Delta Lake¶
4.1 Configure Delta Lake Sink¶
# Cell 4: Write streaming data to Delta Lake
# Define checkpoint location
checkpoint_location = f"abfss://raw@{storage_account}.dfs.core.windows.net/checkpoints/transactions-stream"
delta_table_path = f"abfss://raw@{storage_account}.dfs.core.windows.net/transactions-streaming/"
# Write stream to Delta Lake
query = (enriched_transactions
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.option("mergeSchema", "true")
.start(delta_table_path)
)
print(f"✅ Streaming to Delta Lake")
print(f"Query ID: {query.id}")
print(f"Status: {query.status}")
4.2 Write Aggregations to Console (for monitoring)¶
# Cell 5: Display real-time statistics
stats_query = (transaction_stats
.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.option("numRows", 20)
.start()
)
# Let it run for 30 seconds
import time
time.sleep(30)
# Stop the query
stats_query.stop()
print("✅ Statistics displayed")
🔄 Step 5: Implement Kafka Integration¶
5.1 Configure Kafka Endpoint¶
# Cell 6: Connect using Kafka protocol (Event Hubs Kafka endpoint)
kafka_bootstrap_servers = f"{EVENT_HUB_NAMESPACE}.servicebus.windows.net:9093"
# Kafka configuration
kafka_options = {
"kafka.bootstrap.servers": kafka_bootstrap_servers,
"subscribe": EVENT_HUB_NAME,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION_STRING}";',
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "30000",
"failOnDataLoss": "false",
"startingOffsets": "earliest"
}
# Read from Event Hub using Kafka
kafka_stream = (spark
.readStream
.format("kafka")
.options(**kafka_options)
.load()
)
print("✅ Kafka stream configured")
5.2 Process Kafka Messages¶
# Cell 7: Parse Kafka messages
kafka_transactions = (kafka_stream
.selectExpr("CAST(value AS STRING) as json_string")
.withColumn("transaction", from_json(col("json_string"), transaction_schema))
.select("transaction.*")
)
# Write to Delta with deduplication
dedupe_checkpoint = f"abfss://raw@{storage_account}.dfs.core.windows.net/checkpoints/kafka-dedupe"
kafka_query = (kafka_transactions
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", dedupe_checkpoint)
.option("mergeSchema", "true")
.trigger(processingTime="10 seconds") # Micro-batch every 10 seconds
.start(delta_table_path)
)
print("✅ Kafka stream processing started")
📊 Step 6: Monitor Streaming Queries¶
6.1 Check Query Status¶
# Cell 8: Monitor active streams
# List all active streaming queries
active_streams = spark.streams.active
for stream in active_streams:
print(f"Query ID: {stream.id}")
print(f"Name: {stream.name}")
print(f"Status: {stream.status}")
print(f"Recent Progress:")
print(stream.lastProgress)
print("-" * 80)
6.2 Query Streaming Metrics¶
# Cell 9: Get detailed metrics
def print_stream_metrics(query):
"""Print detailed streaming query metrics"""
status = query.status
progress = query.lastProgress
if progress:
print(f"Input Rate: {progress.get('inputRowsPerSecond', 0)} rows/sec")
print(f"Processing Rate: {progress.get('processedRowsPerSecond', 0)} rows/sec")
print(f"Batch Duration: {progress.get('batchDuration', 0)} ms")
print(f"Total Input Rows: {progress.get('numInputRows', 0)}")
print(f"Running: {status.get('isDataAvailable', False)}")
print(f"Trigger: {status.get('trigger', 'N/A')}")
# Monitor main query
print_stream_metrics(query)
6.3 Access Streaming Data¶
-- Cell 10: Query streaming Delta table
-- Switch to SQL
SELECT
category,
location,
COUNT(*) as transaction_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_transaction_value
FROM delta.`abfss://raw@{storage_account}.dfs.core.windows.net/transactions-streaming/`
WHERE processing_date = CURRENT_DATE()
GROUP BY category, location
ORDER BY total_revenue DESC;
🎯 Step 7: Implement Windowed Aggregations¶
7.1 Tumbling Window Aggregations¶
# Cell 11: 5-minute tumbling windows
tumbling_agg = (enriched_transactions
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes"),
col("category")
)
.agg(
count("*").alias("txn_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
)
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("category"),
col("txn_count"),
col("total_amount"),
col("avg_amount")
)
)
# Write to Delta
tumbling_checkpoint = f"abfss://curated@{storage_account}.dfs.core.windows.net/checkpoints/tumbling-agg"
tumbling_output = f"abfss://curated@{storage_account}.dfs.core.windows.net/transaction-analytics/by-category/"
tumbling_query = (tumbling_agg
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", tumbling_checkpoint)
.trigger(processingTime="1 minute")
.start(tumbling_output)
)
print("✅ Tumbling window aggregation running")
7.2 Sliding Window Aggregations¶
# Cell 12: 10-minute sliding windows, sliding every 5 minutes
sliding_agg = (enriched_transactions
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("location")
)
.agg(
count("*").alias("txn_count"),
sum("amount").alias("revenue")
)
)
# Display to console
sliding_query = (sliding_agg
.writeStream
.outputMode("complete")
.format("console")
.start()
)
print("✅ Sliding window aggregation started")
✅ Step 8: Validate and Test¶
8.1 Verification Script¶
# Check Event Hub message count
az eventhubs eventhub show `
--namespace-name $eventHubNamespace `
--resource-group $naming.ResourceGroupName `
--name "transactions-stream" `
--query "status" `
--output tsv
Write-Host "✅ Event Hub status verified" -ForegroundColor Green
8.2 Query Streaming Results¶
-- Verify data in Delta Lake
SELECT
COUNT(*) as total_records,
MIN(timestamp) as earliest_transaction,
MAX(timestamp) as latest_transaction,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(amount) as total_revenue
FROM delta.`abfss://raw@{storage_account}.dfs.core.windows.net/transactions-streaming/`;
💡 Key Concepts Review¶
Streaming vs Batch¶
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Latency | Minutes to hours | Seconds to minutes |
| Data Volume | Large datasets | Continuous flow |
| Complexity | Simpler | More complex |
| Use Cases | Historical analysis | Real-time insights |
| Cost | Lower (scheduled) | Higher (always-on) |
Event Hub Best Practices¶
- ✅ Use appropriate partition count (4-32 for most workloads)
- ✅ Enable Kafka protocol for ecosystem compatibility
- ✅ Implement checkpointing for fault tolerance
- ✅ Monitor consumer lag regularly
- ✅ Use Auto Inflate for variable workloads
Spark Structured Streaming Concepts¶
- Watermarks: Handle late-arriving data
- Triggers: Control micro-batch frequency
- Checkpoints: Enable fault-tolerant recovery
- Output Modes: Append, Complete, Update
- Stateful Operations: Aggregations, joins, deduplication
🎉 Congratulations¶
You've successfully built real-time streaming pipelines. Your solution now includes:
- ✅ Event Hub namespace configured for streaming
- ✅ Spark Structured Streaming applications
- ✅ Delta Lake integration for unified storage
- ✅ Windowed aggregations for time-series analysis
- ✅ Kafka compatibility for ecosystem integration
🚀 What's Next?¶
Continue to Tutorial 6: Spark Pool Configuration
In the next tutorial, you'll:
- Optimize Spark pool sizing and configuration
- Implement auto-scaling strategies
- Tune Spark performance parameters
- Monitor resource utilization
💬 Troubleshooting¶
Common Issues and Solutions¶
Issue: Connection timeout to Event Hub
# Increase timeout settings
ehConf["eventhubs.connectionTimeout"] = "120s"
ehConf["eventhubs.operationTimeout"] = "120s"
Issue: Streaming query fails with checkpoint errors
Issue: High latency in stream processing
# Reduce micro-batch interval
.trigger(processingTime="5 seconds") # Instead of default 500ms
# Increase parallelism
spark.conf.set("spark.sql.shuffle.partitions", "16")
Tutorial Progress: 5 of 14 completed Next: 06. Spark Pools → Time Investment: 30 minutes ✅
Real-time streaming complements batch processing. Master both for complete data solutions.