Skip to content

Disaster Recovery for Streaming Workloads

Status Complexity Availability

🌊 Real-Time Resilience Ensure continuous operation of streaming analytics pipelines with geo-DR, high availability, and automatic failover capabilities.

📋 Table of Contents

Streaming DR Architecture

Reference Architecture

graph TB
    P[Producers] -->|Events| EH1[Event Hub Primary]
    EH1 -.->|Geo-DR Metadata| EH2[Event Hub Secondary]
    EH1 -->|Events| SA1[Stream Analytics Primary]
    EH2 -.->|Standby| SA2[Stream Analytics Secondary]

    SA1 -->|Output| OUT1[Primary Outputs]
    SA2 -.->|Standby| OUT2[Secondary Outputs]

    SA1 -->|Checkpoints| ST1[Storage Primary]
    SA2 -.->|Replicated| ST2[Storage Secondary]

    subgraph Primary Region
        EH1
        SA1
        OUT1
        ST1
    end

    subgraph Secondary Region
        EH2
        SA2
        OUT2
        ST2
    end

Availability Targets

Component SLA RTO Target RPO Target Failover Type
Event Hubs Geo-DR 99.9% 5-15 min 0 (metadata only) Manual
Event Hubs Standard 99.95% N/A Real-time N/A
Stream Analytics 99.9% 15-30 min 5-15 min Manual
Storage (GRS) 99.99% 1-2 hours 15 min Microsoft-initiated

Event Hubs Geo-DR

Geo-DR Configuration

Create Geo-DR Pairing

# Create primary Event Hubs namespace
az eventhubs namespace create \
    --resource-group rg-streaming-primary \
    --name eventhub-primary \
    --location eastus \
    --sku Standard \
    --capacity 2

# Create secondary Event Hubs namespace
az eventhubs namespace create \
    --resource-group rg-streaming-secondary \
    --name eventhub-secondary \
    --location westus \
    --sku Standard \
    --capacity 2

# Create Geo-DR pairing (alias)
az eventhubs georecovery-alias set \
    --resource-group rg-streaming-primary \
    --namespace-name eventhub-primary \
    --alias eventhub-dr-alias \
    --partner-namespace "/subscriptions/{sub-id}/resourceGroups/rg-streaming-secondary/providers/Microsoft.EventHub/namespaces/eventhub-secondary"

# Verify pairing status
az eventhubs georecovery-alias show \
    --resource-group rg-streaming-primary \
    --namespace-name eventhub-primary \
    --alias eventhub-dr-alias \
    --query '{role:role, partnerNamespace:partnerNamespace, provisioningState:provisioningState}'

Producer Configuration for Geo-DR

from azure.eventhub import EventHubProducerClient, EventData

# Use alias endpoint (automatically routes to active namespace)
ALIAS_CONNECTION_STRING = "Endpoint=sb://eventhub-dr-alias.servicebus.windows.net/..."

producer = EventHubProducerClient.from_connection_string(
    conn_str=ALIAS_CONNECTION_STRING,
    eventhub_name="telemetry"
)

# Implement retry logic for transient failures
from azure.core.exceptions import ServiceBusError
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=60)
)
async def send_event_with_retry(event_data):
    """Send event with automatic retry"""
    async with producer:
        batch = producer.create_batch()
        batch.add(EventData(event_data))
        await producer.send_batch(batch)

Consumer Configuration for Geo-DR

from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

# Use alias for consumer as well
consumer = EventHubConsumerClient.from_connection_string(
    conn_str=ALIAS_CONNECTION_STRING,
    consumer_group="$Default",
    eventhub_name="telemetry",
    checkpoint_store=BlobCheckpointStore.from_connection_string(
        conn_str=STORAGE_CONNECTION_STRING,
        container_name="checkpoints"
    )
)

async def on_event(partition_context, event):
    """Process events with checkpointing"""
    try:
        # Process event
        await process_telemetry(event.body_as_str())

        # Checkpoint every 100 events
        if int(event.sequence_number) % 100 == 0:
            await partition_context.update_checkpoint(event)

    except Exception as e:
        # Log error but continue processing
        print(f"Error processing event: {e}")

# Start receiving (will automatically reconnect on failover)
async with consumer:
    await consumer.receive(
        on_event=on_event,
        starting_position="-1"
    )

Stream Analytics High Availability

Job Configuration for HA

Geo-Redundant Input Configuration

{
  "name": "input-eventhub-primary",
  "type": "Microsoft.StreamAnalytics/streamingjobs/inputs",
  "properties": {
    "type": "Stream",
    "datasource": {
      "type": "Microsoft.ServiceBus/EventHub",
      "properties": {
        "serviceBusNamespace": "eventhub-dr-alias",
        "eventHubName": "telemetry",
        "consumerGroupName": "$Default",
        "authenticationMode": "Msi"
      }
    },
    "serialization": {
      "type": "Json",
      "properties": {
        "encoding": "UTF8"
      }
    }
  }
}

Geo-Redundant Output Configuration

{
  "name": "output-cosmos-primary",
  "type": "Microsoft.StreamAnalytics/streamingjobs/outputs",
  "properties": {
    "datasource": {
      "type": "Microsoft.Storage/DocumentDB",
      "properties": {
        "accountId": "cosmosdb-account",
        "accountKey": null,
        "database": "telemetry-db",
        "collectionNamePattern": "events",
        "partitionKey": "deviceId",
        "authenticationMode": "Msi"
      }
    }
  }
}

Active-Passive Stream Analytics Deployment

# Deploy primary Stream Analytics job
az stream-analytics job create \
    --resource-group rg-streaming-primary \
    --name streaming-job-primary \
    --location eastus \
    --output-error-policy Stop \
    --events-out-of-order-policy Adjust \
    --events-out-of-order-max-delay 5 \
    --data-locale en-US

# Deploy secondary Stream Analytics job (paused)
az stream-analytics job create \
    --resource-group rg-streaming-secondary \
    --name streaming-job-secondary \
    --location westus \
    --output-error-policy Stop \
    --events-out-of-order-policy Adjust \
    --events-out-of-order-max-delay 5 \
    --data-locale en-US

# Start primary job
az stream-analytics job start \
    --resource-group rg-streaming-primary \
    --name streaming-job-primary \
    --output-start-mode JobStartTime

Failover Procedures

Event Hubs Failover

Initiate Planned Failover

# Break pairing (initiates failover to secondary)
az eventhubs georecovery-alias break-pair \
    --resource-group rg-streaming-primary \
    --namespace-name eventhub-primary \
    --alias eventhub-dr-alias

# Verify secondary is now primary
az eventhubs georecovery-alias show \
    --resource-group rg-streaming-secondary \
    --namespace-name eventhub-secondary \
    --alias eventhub-dr-alias \
    --query 'role'

# Re-create pairing with roles reversed
az eventhubs georecovery-alias set \
    --resource-group rg-streaming-secondary \
    --namespace-name eventhub-secondary \
    --alias eventhub-dr-alias \
    --partner-namespace "/subscriptions/{sub-id}/resourceGroups/rg-streaming-primary/providers/Microsoft.EventHub/namespaces/eventhub-primary"

Automated Failover Detection

import asyncio
from azure.eventhub import EventHubProducerClient
from azure.core.exceptions import ServiceUnavailableError

async def monitor_and_failover():
    """Monitor Event Hub health and log failover events"""
    while True:
        try:
            # Test connection
            producer = EventHubProducerClient.from_connection_string(
                conn_str=ALIAS_CONNECTION_STRING,
                eventhub_name="telemetry"
            )
            async with producer:
                # Connection successful
                print("Event Hub connection healthy")
                await asyncio.sleep(60)

        except ServiceUnavailableError as e:
            # Connection failed - failover may be in progress
            print(f"Event Hub unavailable: {e}")
            print("Failover may be in progress, will retry...")
            await asyncio.sleep(10)

        except Exception as e:
            print(f"Unexpected error: {e}")
            await asyncio.sleep(30)

Stream Analytics Failover

Failover Runbook

#!/bin/bash
# stream-analytics-failover.sh

echo "Starting Stream Analytics failover..."

# 1. Stop primary job
az stream-analytics job stop \
    --resource-group rg-streaming-primary \
    --name streaming-job-primary

# 2. Start secondary job (with last output time of primary)
LAST_OUTPUT=$(az stream-analytics job show \
    --resource-group rg-streaming-primary \
    --name streaming-job-primary \
    --query 'outputStartTime' -o tsv)

az stream-analytics job start \
    --resource-group rg-streaming-secondary \
    --name streaming-job-secondary \
    --output-start-mode CustomTime \
    --output-start-time "$LAST_OUTPUT"

# 3. Monitor secondary job status
az stream-analytics job show \
    --resource-group rg-streaming-secondary \
    --name streaming-job-secondary \
    --query '{state:jobState, startTime:outputStartTime}'

echo "Failover completed"

State Management and Recovery

Checkpoint Management

# Replicate checkpoints to secondary region
# Using GRS storage for checkpoint store
az storage account create \
    --name streamcheckpoints \
    --resource-group rg-streaming-primary \
    --location eastus \
    --sku Standard_GRS \
    --kind StorageV2

# Verify replication status
az storage account show \
    --name streamcheckpoints \
    --resource-group rg-streaming-primary \
    --query '{primaryLocation:primaryLocation, secondaryLocation:secondaryLocation, geoReplicationStats:geoReplicationStats}'

Stream Analytics State Recovery

# Export job configuration
az stream-analytics job show \
    --resource-group rg-streaming-primary \
    --name streaming-job-primary \
    > streaming-job-config.json

# Deploy to secondary region with same configuration
az stream-analytics job create \
    --resource-group rg-streaming-secondary \
    --name streaming-job-secondary \
    --location westus \
    --definition streaming-job-config.json

Testing and Validation

DR Drill Procedure

## Monthly DR Drill Checklist

### Pre-Test (T-24 hours)
- [ ] Notify stakeholders of test window
- [ ] Verify secondary region resources are deployed
- [ ] Check checkpoint replication lag
- [ ] Document baseline metrics (throughput, latency)

### Test Execution
1. **T+0:00** - Initiate Event Hubs failover
   ```bash
   az eventhubs georecovery-alias break-pair ...
   ```text
2. **T+0:05** - Monitor producer reconnection
   - Verify producers connect to secondary
   - Check for message loss (should be zero)

3. **T+0:10** - Failover Stream Analytics
   ```bash
   ./stream-analytics-failover.sh
   ```text
4. **T+0:20** - Validate end-to-end flow
   - Send test events
   - Verify output in target systems
   - Check latency metrics

5. **T+0:30** - Measure recovery metrics
   - RTO actual vs target
   - RPO actual vs target
   - Data loss (if any)

### Post-Test
- [ ] Document lessons learned
- [ ] Update runbooks
- [ ] Failback to primary region
- [ ] Resume normal operations

Automated Health Checks

import asyncio
from azure.monitor.query import LogsQueryClient
from azure.identity import DefaultAzureCredential

async def validate_streaming_health():
    """Automated health validation"""
    credential = DefaultAzureCredential()
    client = LogsQueryClient(credential)

    # Query Event Hubs metrics
    eventhub_query = """
    AzureDiagnostics
    | where ResourceProvider == "MICROSOFT.EVENTHUB"
    | where TimeGenerated > ago(5m)
    | summarize
        IncomingMessages = sum(IncomingMessages_d),
        OutgoingMessages = sum(OutgoingMessages_d),
        ThrottledRequests = sum(ThrottledRequests_d)
    """

    # Query Stream Analytics metrics
    streamanalytics_query = """
    AzureDiagnostics
    | where ResourceProvider == "MICROSOFT.STREAMANALYTICS"
    | where TimeGenerated > ago(5m)
    | summarize
        InputEvents = sum(inputEvents_d),
        OutputEvents = sum(outputEvents_d),
        WatermarkDelay = max(watermarkDelay_d)
    """

    # Execute queries and validate
    # (Implementation details omitted for brevity)

    print("Health check completed")

# Run health checks every 5 minutes
asyncio.create_task(validate_streaming_health())

Performance During DR Events

Minimize Disruption

Best Practice Description Impact
Use Alias Endpoints Clients connect via alias, not direct namespace Transparent failover
Implement Retry Logic Exponential backoff for transient failures Automatic recovery
Checkpoint Frequently Balance between performance and recovery Faster recovery
Monitor Replication Lag Alert on checkpoint storage replication delays Proactive issue detection
Pre-warm Secondary Keep secondary Stream Analytics job deployed Faster activation

Cost Optimization for DR

# Use serverless Event Hubs for secondary (if suitable)
az eventhubs namespace create \
    --resource-group rg-streaming-secondary \
    --name eventhub-secondary \
    --location westus \
    --sku Standard  # Or Premium for higher throughput

# Keep secondary Stream Analytics job stopped until needed
az stream-analytics job stop \
    --resource-group rg-streaming-secondary \
    --name streaming-job-secondary

# Use lifecycle policies for old checkpoints
# (See disaster-recovery.md for storage lifecycle examples)

🌊 Streaming Never Stops Real-time systems require real-time resilience. Test failover procedures regularly and monitor continuously to ensure seamless operations during disasters.