🌊 Event Streaming Basics¶
Fundamental concepts and patterns for event streaming with Azure Event Hubs.
🎯 What is Event Streaming?¶
Event streaming is the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications as streams of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in real-time as well as retrospectively.
Key Characteristics¶
- Continuous: Events flow continuously, not in discrete batches
- Ordered: Events maintain order within partitions
- Immutable: Once written, events cannot be modified
- Persistent: Events are durably stored for configured retention period
- Scalable: System scales horizontally to handle increasing load
🏗️ Core Components¶
Producers¶
Producers (also called publishers) are applications that send events to Event Hubs.
from azure.eventhub import EventHubProducerClient, EventData
import os
import json
from datetime import datetime
class EventProducer:
def __init__(self, connection_string, eventhub_name):
self.producer = EventHubProducerClient.from_connection_string(
conn_str=connection_string,
eventhub_name=eventhub_name
)
def send_single_event(self, event_data):
"""Send a single event."""
event = EventData(json.dumps(event_data))
self.producer.send_event(event)
def send_batch_events(self, events_list):
"""Send events in batches for better throughput."""
event_batch = self.producer.create_batch()
for event_data in events_list:
try:
event_batch.add(EventData(json.dumps(event_data)))
except ValueError:
# Batch is full, send it and create new one
self.producer.send_batch(event_batch)
event_batch = self.producer.create_batch()
event_batch.add(EventData(json.dumps(event_data)))
# Send remaining events
if len(event_batch) > 0:
self.producer.send_batch(event_batch)
def close(self):
self.producer.close()
# Usage example
if __name__ == "__main__":
producer = EventProducer(
connection_string=os.getenv("EVENTHUB_CONNECTION_STRING"),
eventhub_name="telemetry-events"
)
# Send single event
producer.send_single_event({
"sensor_id": "sensor-001",
"temperature": 23.5,
"timestamp": datetime.utcnow().isoformat()
})
# Send batch of events
events = [
{"sensor_id": f"sensor-{i}", "temperature": 20 + i, "timestamp": datetime.utcnow().isoformat()}
for i in range(100)
]
producer.send_batch_events(events)
producer.close()
Consumers¶
Consumers (also called subscribers) read events from Event Hubs.
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import os
class EventConsumer:
def __init__(self, connection_string, eventhub_name, consumer_group, checkpoint_store):
self.consumer = EventHubConsumerClient.from_connection_string(
conn_str=connection_string,
consumer_group=consumer_group,
eventhub_name=eventhub_name,
checkpoint_store=checkpoint_store
)
def on_event(self, partition_context, event):
"""Process individual event."""
print(f"Partition {partition_context.partition_id}: {event.body_as_str()}")
# Update checkpoint every 10 events
if partition_context.offset % 10 == 0:
partition_context.update_checkpoint(event)
def on_partition_initialize(self, partition_context):
"""Called when partition processing starts."""
print(f"Partition {partition_context.partition_id} initialized")
def on_partition_close(self, partition_context, reason):
"""Called when partition processing stops."""
print(f"Partition {partition_context.partition_id} closed: {reason}")
def on_error(self, partition_context, error):
"""Handle errors during processing."""
print(f"Error on partition {partition_context.partition_id}: {error}")
def receive_events(self):
"""Start receiving events."""
with self.consumer:
self.consumer.receive(
on_event=self.on_event,
on_partition_initialize=self.on_partition_initialize,
on_partition_close=self.on_partition_close,
on_error=self.on_error,
starting_position="-1" # Start from beginning
)
# Usage example with checkpoint store
if __name__ == "__main__":
# Create checkpoint store for tracking consumer progress
checkpoint_store = BlobCheckpointStore.from_connection_string(
conn_str=os.getenv("STORAGE_CONNECTION_STRING"),
container_name="eventhub-checkpoints"
)
consumer = EventConsumer(
connection_string=os.getenv("EVENTHUB_CONNECTION_STRING"),
eventhub_name="telemetry-events",
consumer_group="$Default",
checkpoint_store=checkpoint_store
)
consumer.receive_events()
Partitions¶
Partitions enable parallel processing and ordering guarantees.
graph TB
subgraph "Event Hub: telemetry-events"
P0[Partition 0<br/>sensor-1, sensor-4, sensor-7]
P1[Partition 1<br/>sensor-2, sensor-5, sensor-8]
P2[Partition 2<br/>sensor-3, sensor-6, sensor-9]
end
subgraph "Producers"
Sensor1[Sensor 1] -->|PKey: sensor-1| P0
Sensor2[Sensor 2] -->|PKey: sensor-2| P1
Sensor3[Sensor 3] -->|PKey: sensor-3| P2
Sensor4[Sensor 4] -->|PKey: sensor-4| P0
end
subgraph "Consumers"
P0 --> C0[Consumer Instance 0]
P1 --> C1[Consumer Instance 1]
P2 --> C2[Consumer Instance 2]
end Partition Key Strategy:
# Send events with partition key for ordering
from azure.eventhub import EventHubProducerClient, EventData
producer = EventHubProducerClient.from_connection_string(
conn_str="your_connection_string",
eventhub_name="your_eventhub"
)
# All events from same sensor go to same partition
sensor_id = "sensor-123"
event_data = EventData("Temperature: 23.5°C")
# Option 1: Use partition key (recommended for ordering)
producer.send_event(event_data, partition_key=sensor_id)
# Option 2: Specify partition ID directly (less flexible)
producer.send_event(event_data, partition_id="0")
# Option 3: Let Event Hubs distribute (best for throughput)
producer.send_event(event_data)
🎯 Event Streaming Patterns¶
Pattern 1: Fire and Forget¶
Producer sends events without waiting for acknowledgment.
# Fast but no delivery guarantee
producer.send_event(EventData("sensor reading"))
# Continue immediately
Use Cases: High-volume telemetry, logging where occasional loss is acceptable
Pattern 2: Synchronous Send with Batching¶
Producer sends events in batches and waits for acknowledgment.
def send_with_batching(producer, events, batch_size=100):
"""Send events in controlled batches."""
batch = producer.create_batch()
sent_count = 0
for event_data in events:
try:
batch.add(EventData(json.dumps(event_data)))
except ValueError:
# Batch full, send it
producer.send_batch(batch)
sent_count += len(batch)
print(f"Sent batch: {len(batch)} events")
# Create new batch
batch = producer.create_batch()
batch.add(EventData(json.dumps(event_data)))
# Send remaining
if len(batch) > 0:
producer.send_batch(batch)
sent_count += len(batch)
return sent_count
Use Cases: Transactional data, financial records, critical events
Pattern 3: Competing Consumers¶
Multiple consumer instances process events in parallel from different partitions.
graph LR
subgraph "Event Hub"
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
P3[Partition 3]
end
subgraph "Consumer Group: analytics"
C0[Consumer 0] --> P0
C1[Consumer 1] --> P1
C2[Consumer 2] --> P2
C3[Consumer 3] --> P3
end
subgraph "Consumer Group: archival"
A0[Consumer 0] --> P0
A0 --> P1
A1[Consumer 1] --> P2
A1 --> P3
end Pattern 4: Event Time vs Processing Time¶
Handle late-arriving events with watermarks and windowing.
from datetime import datetime, timedelta
import json
def process_event_with_watermark(event, watermark_delay_minutes=5):
"""Process events considering event time vs processing time."""
event_data = json.loads(event.body_as_str())
# Parse event timestamp
event_time = datetime.fromisoformat(event_data["timestamp"])
processing_time = datetime.utcnow()
# Calculate watermark (allow 5 minutes late)
watermark = processing_time - timedelta(minutes=watermark_delay_minutes)
if event_time < watermark:
print(f"Late event detected: {event_time} (watermark: {watermark})")
# Handle late event (dead letter, separate processing, etc.)
return "late"
else:
# Process normally
return "on_time"
🔄 Consumer Patterns¶
Single Consumer with Checkpointing¶
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
def on_event_batch(partition_context, events):
"""Process events in batches with checkpointing."""
for event in events:
# Process event
process_telemetry_data(event.body_as_str())
# Checkpoint after processing all events in batch
partition_context.update_checkpoint()
# Setup consumer with checkpoint store
checkpoint_store = BlobCheckpointStore.from_connection_string(
conn_str=storage_connection_string,
container_name="checkpoints"
)
consumer = EventHubConsumerClient.from_connection_string(
conn_str=eventhub_connection_string,
consumer_group="$Default",
eventhub_name="telemetry",
checkpoint_store=checkpoint_store
)
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Multiple Consumers with Consumer Groups¶
# Consumer Group 1: Real-time analytics
analytics_consumer = EventHubConsumerClient.from_connection_string(
conn_str=connection_string,
consumer_group="analytics-team",
eventhub_name="telemetry"
)
# Consumer Group 2: Data archival
archival_consumer = EventHubConsumerClient.from_connection_string(
conn_str=connection_string,
consumer_group="archival-team",
eventhub_name="telemetry"
)
# Both read same events independently
Event Processing with Error Handling¶
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def process_event_with_retry(event_data):
"""Process event with automatic retry on failure."""
try:
# Parse and validate
data = json.loads(event_data)
# Business logic
result = analyze_sensor_data(data)
# Store result
store_analysis_result(result)
return True
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
# Don't retry for invalid data
raise ValueError("Invalid event format") from e
except Exception as e:
logger.error(f"Processing failed: {e}")
# Retry for transient failures
raise
def on_event_with_error_handling(partition_context, event):
"""Event handler with robust error handling."""
try:
process_event_with_retry(event.body_as_str())
# Checkpoint on success
partition_context.update_checkpoint(event)
except ValueError:
# Invalid data - move to dead letter queue
send_to_dead_letter(event)
except Exception as e:
# Unrecoverable error
logger.error(f"Failed to process event: {e}")
# Skip this event and continue
📊 Throughput and Scaling¶
Understanding Throughput Units¶
def calculate_throughput_requirements(events_per_second, avg_event_size_kb):
"""Calculate required throughput units."""
# Standard tier limits per TU
INGRESS_MB_PER_TU = 1
INGRESS_EVENTS_PER_TU = 1000
# Calculate based on size
total_mb_per_sec = (events_per_second * avg_event_size_kb) / 1024
tus_by_size = total_mb_per_sec / INGRESS_MB_PER_TU
# Calculate based on count
tus_by_count = events_per_second / INGRESS_EVENTS_PER_TU
# Take the maximum
required_tus = max(tus_by_size, tus_by_count)
return {
"required_tus": int(required_tus) + 1, # Round up
"tus_by_size": tus_by_size,
"tus_by_count": tus_by_count,
"recommendation": "Consider auto-inflate for variable loads"
}
# Example calculation
result = calculate_throughput_requirements(
events_per_second=5000,
avg_event_size_kb=2
)
print(f"Required TUs: {result['required_tus']}")
Partition Count Strategy¶
def recommend_partition_count(
expected_events_per_sec,
consumer_instances,
growth_factor=2
):
"""Recommend partition count based on workload."""
# Rule of thumb: 1 partition per 1-2 MB/sec throughput
# Or: Match consumer instance count for parallel processing
min_partitions = max(
consumer_instances,
int(expected_events_per_sec / 1000) # ~1000 events/sec per partition
)
# Account for growth
recommended = min_partitions * growth_factor
# Event Hubs limits
if recommended > 32:
return {
"recommended": 32,
"note": "Standard tier max is 32. Consider Premium for up to 100."
}
return {
"recommended": recommended,
"min_partitions": min_partitions,
"consumer_instances": consumer_instances
}
🔒 Security Best Practices¶
Using Managed Identity¶
from azure.eventhub import EventHubProducerClient
from azure.identity import DefaultAzureCredential
# Use managed identity instead of connection string
credential = DefaultAzureCredential()
producer = EventHubProducerClient(
fully_qualified_namespace="eventhub-namespace.servicebus.windows.net",
eventhub_name="telemetry-events",
credential=credential
)
# Send event
producer.send_event(EventData("secure event"))
producer.close()
Shared Access Signatures (SAS)¶
from azure.eventhub import EventHubProducerClient
from datetime import datetime, timedelta
# Generate SAS token with specific permissions and expiry
def generate_sas_token(namespace, eventhub, key_name, key_value, expiry_hours=24):
"""Generate time-limited SAS token."""
from urllib.parse import quote_plus
import hmac
import hashlib
import base64
import time
uri = f"sb://{namespace}.servicebus.windows.net/{eventhub}"
encoded_uri = quote_plus(uri)
expiry = int(time.time() + (expiry_hours * 3600))
string_to_sign = f"{encoded_uri}\n{expiry}"
signature = base64.b64encode(
hmac.new(
key_value.encode('utf-8'),
string_to_sign.encode('utf-8'),
hashlib.sha256
).digest()
)
token = f"SharedAccessSignature sr={encoded_uri}&sig={quote_plus(signature)}&se={expiry}&skn={key_name}"
return token
# Use SAS token
sas_token = generate_sas_token(
namespace="eventhub-namespace",
eventhub="telemetry-events",
key_name="SendPolicy",
key_value="your-key-value",
expiry_hours=2 # Short-lived token
)
📈 Monitoring and Observability¶
Track Producer Metrics¶
import time
from collections import defaultdict
class InstrumentedProducer:
def __init__(self, producer):
self.producer = producer
self.metrics = defaultdict(int)
self.start_time = time.time()
def send_event(self, event_data):
"""Send event and track metrics."""
start = time.time()
try:
self.producer.send_event(event_data)
self.metrics["events_sent"] += 1
self.metrics["total_latency_ms"] += (time.time() - start) * 1000
except Exception as e:
self.metrics["send_errors"] += 1
raise
def get_metrics(self):
"""Get producer performance metrics."""
elapsed = time.time() - self.start_time
events_sent = self.metrics["events_sent"]
return {
"events_sent": events_sent,
"errors": self.metrics["send_errors"],
"avg_latency_ms": (
self.metrics["total_latency_ms"] / events_sent
if events_sent > 0 else 0
),
"events_per_second": events_sent / elapsed if elapsed > 0 else 0
}
Monitor Consumer Lag¶
from azure.eventhub import EventHubConsumerClient
def calculate_consumer_lag(partition_context, event):
"""Calculate how far behind consumer is from producer."""
# Compare event sequence number with partition's last sequence
lag = partition_context.last_enqueued_sequence_number - event.sequence_number
if lag > 1000:
print(f"⚠️ High consumer lag detected: {lag} events behind")
return lag
🔗 Related Resources¶
Integration Guides¶
- Kafka Compatibility - Using Event Hubs with Kafka protocol
- Capture to Storage - Automatic event archival
- Schema Registry - Schema validation and evolution
Best Practices¶
- Performance Tuning
- Security Configuration
- Disaster Recovery
Code Examples¶
- Python SDK Examples
- C# SDK Examples
- Integration Scenarios
Last Updated: 2025-01-28 Complexity: Beginner Estimated Reading Time: 25 minutes