Skip to content

⚡ Tutorial 10: Performance Tuning and Optimization

Tutorial Duration Level

Optimize Stream Analytics jobs for maximum throughput, minimal latency, and cost efficiency. Learn scaling strategies, query optimization, and monitoring best practices for production workloads.

🎯 Learning Objectives

After completing this tutorial, you will be able to:

  • Monitor job performance using metrics and diagnostics
  • Optimize query patterns for throughput and latency
  • Scale streaming units appropriately for workload demands
  • Implement partitioning strategies for parallel processing
  • Minimize resource costs while maintaining SLA requirements

⏱️ Time Estimate: 30 minutes

  • Performance Monitoring: 8 minutes
  • Query Optimization: 10 minutes
  • Scaling Strategies: 7 minutes
  • Cost Optimization: 5 minutes

📋 Prerequisites

  • Completed all previous tutorials (01-09)
  • Running Stream Analytics job with production-like workload
  • Access to Azure Monitor and Log Analytics
  • Understanding of query fundamentals

📊 Understanding Performance Metrics

10.1 Key Performance Indicators

Metric Target Warning Threshold Critical Threshold
SU% Utilization 60-80% >80% >95%
Watermark Delay <30 seconds >1 minute >5 minutes
Input Events Stable Sudden drops Zero events
Output Events Proportional to input Backlog growth Output failure
Data Conversion Errors 0 >0.1% >1%
Runtime Errors 0 Any Continuous

10.2 Enable Diagnostic Logging

Configure comprehensive diagnostics:

# Enable all diagnostic categories
$diagnosticSettings = @{
    name = "stream-analytics-diagnostics"
    resourceId = (az stream-analytics job show `
        --resource-group $env:STREAM_RG `
        --name $env:STREAM_JOB `
        --query id -o tsv)
    workspaceId = (az monitor log-analytics workspace show `
        --resource-group $env:STREAM_RG `
        --workspace-name "stream-analytics-logs" `
        --query id -o tsv)
}

# Create diagnostic setting
az monitor diagnostic-settings create `
    --name $diagnosticSettings.name `
    --resource $diagnosticSettings.resourceId `
    --workspace $diagnosticSettings.workspaceId `
    --logs '[
        {"category": "Execution", "enabled": true},
        {"category": "Authoring", "enabled": true}
    ]' `
    --metrics '[
        {"category": "AllMetrics", "enabled": true}
    ]'

10.3 Real-Time Metrics Dashboard

Create Azure Monitor dashboard:

# Query key metrics
$jobResourceId = az stream-analytics job show `
    --resource-group $env:STREAM_RG `
    --name $env:STREAM_JOB `
    --query id -o tsv

# SU% Utilization
az monitor metrics list `
    --resource $jobResourceId `
    --metric "ResourceUtilization" `
    --start-time (Get-Date).AddHours(-1).ToString("yyyy-MM-ddTHH:mm:ss") `
    --end-time (Get-Date).ToString("yyyy-MM-ddTHH:mm:ss") `
    --interval PT1M `
    --aggregation Average `
    --output table

# Watermark Delay
az monitor metrics list `
    --resource $jobResourceId `
    --metric "WatermarkDelay" `
    --aggregation Maximum `
    --interval PT1M `
    --output table

# Input/Output Events
az monitor metrics list `
    --resource $jobResourceId `
    --metric "InputEvents" `
    --aggregation Total `
    --interval PT5M `
    --output table

🔍 Query Optimization

10.4 Identify Performance Bottlenecks

Use query execution diagnostics:

-- Add diagnostic output to track query execution
SELECT
    System.Timestamp() AS ProcessingTime,
    COUNT(*) AS EventCount,
    AVG(DATEDIFF(millisecond, EventTime, System.Timestamp())) AS ProcessingLatencyMs
INTO
    [PerformanceDiagnostics]
FROM
    [SensorInput] TIMESTAMP BY EventTime
GROUP BY
    TumblingWindow(second, 30)

10.5 Optimize JOIN Operations

Inefficient JOIN patterns and optimizations:

-- ❌ INEFFICIENT: Cartesian product without proper constraints
SELECT
    s1.DeviceId,
    s1.Temperature,
    s2.Humidity
FROM
    [SensorStream1] s1 TIMESTAMP BY EventTime
JOIN
    [SensorStream2] s2 TIMESTAMP BY EventTime
ON
    s1.DeviceId = s2.DeviceId
    AND DATEDIFF(minute, s1, s2) BETWEEN -60 AND 60  -- Too wide!

-- ✅ OPTIMIZED: Narrow temporal window
SELECT
    s1.DeviceId,
    s1.Temperature,
    s2.Humidity
FROM
    [SensorStream1] s1 TIMESTAMP BY EventTime
JOIN
    [SensorStream2] s2 TIMESTAMP BY EventTime
ON
    s1.DeviceId = s2.DeviceId
    AND DATEDIFF(second, s1, s2) BETWEEN -5 AND 5  -- Narrow window

Optimization Benefits: - Reduced memory footprint (smaller join window) - Lower CPU utilization - Faster event processing

10.6 Efficient Windowing Patterns

-- ❌ INEFFICIENT: Multiple aggregations with redundant windows
SELECT DeviceId, AVG(Temperature) AS AvgTemp
INTO [Output1]
FROM [Input] TIMESTAMP BY EventTime
GROUP BY DeviceId, TumblingWindow(minute, 5);

SELECT DeviceId, MAX(Temperature) AS MaxTemp
INTO [Output2]
FROM [Input] TIMESTAMP BY EventTime
GROUP BY DeviceId, TumblingWindow(minute, 5);

-- ✅ OPTIMIZED: Single window with multiple aggregations
SELECT
    DeviceId,
    AVG(Temperature) AS AvgTemp,
    MAX(Temperature) AS MaxTemp,
    MIN(Temperature) AS MinTemp,
    STDEV(Temperature) AS StdDevTemp,
    System.Timestamp() AS WindowEnd
INTO
    [CombinedOutput]
FROM
    [Input] TIMESTAMP BY EventTime
GROUP BY
    DeviceId,
    TumblingWindow(minute, 5)

10.7 Subquery Optimization

-- ❌ INEFFICIENT: Nested subqueries
SELECT DeviceId, AvgTemp
FROM (
    SELECT DeviceId, AVG(Temperature) AS AvgTemp
    FROM [Input] TIMESTAMP BY EventTime
    GROUP BY DeviceId, TumblingWindow(minute, 5)
)
WHERE AvgTemp > 75;

-- ✅ OPTIMIZED: Use HAVING clause instead
SELECT
    DeviceId,
    AVG(Temperature) AS AvgTemp,
    System.Timestamp() AS WindowEnd
INTO
    [OptimizedOutput]
FROM
    [Input] TIMESTAMP BY EventTime
GROUP BY
    DeviceId,
    TumblingWindow(minute, 5)
HAVING
    AVG(Temperature) > 75

10.8 Reduce Data Conversion Errors

Handle data type mismatches efficiently:

-- ❌ INEFFICIENT: Causes data conversion errors
SELECT
    DeviceId,
    Temperature  -- Assumes always numeric
INTO [Output]
FROM [Input] TIMESTAMP BY EventTime;

-- ✅ OPTIMIZED: Defensive type casting with error handling
SELECT
    DeviceId,
    TRY_CAST(Temperature AS FLOAT) AS Temperature,
    CASE
        WHEN TRY_CAST(Temperature AS FLOAT) IS NULL THEN 1
        ELSE 0
    END AS ConversionError
INTO [SafeOutput]
FROM [Input] TIMESTAMP BY EventTime
WHERE TRY_CAST(Temperature AS FLOAT) IS NOT NULL  -- Filter bad data early

📈 Scaling Strategies

10.9 Understanding Streaming Units (SUs)

Streaming Units represent compute capacity:

1 SU = 1 MB/s throughput capacity

Scaling Tiers:
- 1-6 SUs: Small workloads (<6 MB/s)
- 6-12 SUs: Medium workloads (6-12 MB/s)
- 12-24 SUs: Large workloads (12-24 MB/s)
- 24+ SUs: Enterprise workloads (requires partitioning)

10.10 Calculate Required SUs

Formula for SU estimation:

# Calculate required SUs based on workload
$eventsPerSecond = 10000
$avgEventSizeKB = 2
$throughputMBps = ($eventsPerSecond * $avgEventSizeKB) / 1024

$requiredSUs = [Math]::Ceiling($throughputMBps)

Write-Host "Estimated throughput: $throughputMBps MB/s"
Write-Host "Recommended SUs: $requiredSUs"
Write-Host "Buffer for peaks (20%): $([Math]::Ceiling($requiredSUs * 1.2))"

10.11 Scale Streaming Units

Adjust SUs based on metrics:

# Check current SU allocation
$currentSUs = az stream-analytics job show `
    --resource-group $env:STREAM_RG `
    --name $env:STREAM_JOB `
    --query "transformation.streamingUnits" -o tsv

Write-Host "Current Streaming Units: $currentSUs"

# Scale up to handle increased load
$newSUs = 12

az stream-analytics transformation update `
    --resource-group $env:STREAM_RG `
    --job-name $env:STREAM_JOB `
    --name "Transformation" `
    --streaming-units $newSUs

Write-Host "Scaled to $newSUs SUs"

10.12 Implement Partitioning for Scalability

Enable parallel processing with partitions:

-- Partition input by DeviceId for parallel processing
SELECT
    DeviceId,
    BuildingId,
    AVG(Temperature) AS AvgTemp,
    COUNT(*) AS EventCount,
    System.Timestamp() AS WindowEnd
INTO
    [PartitionedOutput]
FROM
    [SensorInput] TIMESTAMP BY EventTime
    PARTITION BY PartitionId  -- Enable partition-level parallelism
GROUP BY
    DeviceId,
    BuildingId,
    TumblingWindow(minute, 5)

Partitioning Benefits: - Linear scalability across SUs - Reduced memory per partition - Improved throughput (can exceed 6 SUs limit with partitions)

10.13 Configure Compatibility Level for Performance

# Use latest compatibility level for performance improvements
az stream-analytics job update `
    --resource-group $env:STREAM_RG `
    --name $env:STREAM_JOB `
    --compatibility-level "1.2"  # Latest version with optimizations

# Verify compatibility level
az stream-analytics job show `
    --resource-group $env:STREAM_RG `
    --name $env:STREAM_JOB `
    --query "compatibilityLevel" -o tsv

🎯 Input/Output Optimization

10.14 Optimize Event Hub Configuration

# Ensure partition count matches parallelization needs
$targetPartitions = 16  # Match with SU count

az eventhubs eventhub update `
    --resource-group $env:STREAM_RG `
    --namespace-name $env:STREAM_EH_NAMESPACE `
    --name $env:STREAM_EH_NAME `
    --partition-count $targetPartitions

# Verify partition count
az eventhubs eventhub show `
    --resource-group $env:STREAM_RG `
    --namespace-name $env:STREAM_EH_NAMESPACE `
    --name $env:STREAM_EH_NAME `
    --query "partitionCount"

10.15 Optimize Output Batching

Configure output batching for efficiency:

// Output configuration for Azure SQL Database
{
  "type": "Microsoft.Sql/Server/Database",
  "properties": {
    "server": "your-sql-server.database.windows.net",
    "database": "streamdb",
    "user": "streamuser",
    "password": "{password}",
    "table": "SensorData",
    "batchSize": 10000,  // Optimize batch size
    "maxWriterCount": 8  // Parallel writers
  }
}

10.16 Blob Storage Partitioning

Optimize blob output with proper partitioning:

// Path pattern for efficient blob partitioning
{
  "pathPattern": "rawdata/{date}/{time}/output_{deviceId}.json",
  "dateFormat": "yyyy/MM/dd",
  "timeFormat": "HH"
}

💰 Cost Optimization

10.17 Calculate Job Costs

Understand cost components:

# Calculate monthly Stream Analytics cost
$streamingUnits = 6
$hoursPerDay = 24
$daysPerMonth = 30
$suHourlyCost = 0.111  # USD per SU-hour

$monthlyCost = $streamingUnits * $hoursPerDay * $daysPerMonth * $suHourlyCost

Write-Host "Monthly Stream Analytics Cost: `$$([Math]::Round($monthlyCost, 2))"
Write-Host "Breakdown:"
Write-Host "  - Streaming Units: $streamingUnits"
Write-Host "  - Hours per month: $($hoursPerDay * $daysPerMonth)"
Write-Host "  - SU-hours: $($streamingUnits * $hoursPerDay * $daysPerMonth)"

10.18 Cost Optimization Strategies

-- Strategy 1: Filter data early to reduce processing
SELECT
    DeviceId,
    Temperature,
    Humidity,
    EventTime
INTO [FilteredOutput]
FROM [Input] TIMESTAMP BY EventTime
WHERE
    Temperature IS NOT NULL
    AND Temperature BETWEEN -50 AND 150  -- Filter invalid ranges early
    AND DeviceId LIKE 'PROD-%'  -- Only process production devices

-- Strategy 2: Reduce output frequency with windowing
SELECT
    DeviceId,
    AVG(Temperature) AS AvgTemp,
    System.Timestamp() AS WindowEnd
INTO [SummarizedOutput]
FROM [Input] TIMESTAMP BY EventTime
GROUP BY
    DeviceId,
    TumblingWindow(minute, 5)  -- Output every 5 minutes vs. every event

10.19 Implement Smart Start/Stop

Automate job scheduling for non-24/7 workloads:

# Create Azure Function to stop job during off-hours
$stopJobScript = @'
param($Timer)

$resourceGroup = $env:STREAM_RG
$jobName = $env:STREAM_JOB

# Stop job at 6 PM weekdays
$currentHour = (Get-Date).Hour
$dayOfWeek = (Get-Date).DayOfWeek

if ($dayOfWeek -ne "Saturday" -and $dayOfWeek -ne "Sunday" -and $currentHour -ge 18) {
    az stream-analytics job stop --resource-group $resourceGroup --name $jobName --no-wait
    Write-Host "Job stopped for cost savings"
}
'@

# Save as Azure Function for scheduling

📉 Monitoring and Alerting

10.20 Configure Performance Alerts

Create alerts for performance degradation:

# Alert when SU utilization exceeds 80%
az monitor metrics alert create `
    --name "HighSUUtilization" `
    --resource-group $env:STREAM_RG `
    --scopes (az stream-analytics job show --resource-group $env:STREAM_RG --name $env:STREAM_JOB --query id -o tsv) `
    --condition "avg ResourceUtilization > 80" `
    --window-size 5m `
    --evaluation-frequency 1m `
    --action "email-admin@company.com" `
    --description "Stream Analytics SU utilization above 80%"

# Alert when watermark delay exceeds 60 seconds
az monitor metrics alert create `
    --name "HighWatermarkDelay" `
    --resource-group $env:STREAM_RG `
    --scopes (az stream-analytics job show --resource-group $env:STREAM_RG --name $env:STREAM_JOB --query id -o tsv) `
    --condition "max WatermarkDelay > 60000" `
    --window-size 5m `
    --evaluation-frequency 1m `
    --action "email-admin@company.com" `
    --description "Watermark delay exceeds 60 seconds"

10.21 Log Analytics Queries

Create performance monitoring queries:

// Query 1: Track SU utilization trends
AzureMetrics
| where ResourceProvider == "MICROSOFT.STREAMANALYTICS"
| where MetricName == "ResourceUtilization"
| summarize AvgUtilization = avg(Average), MaxUtilization = max(Maximum)
    by bin(TimeGenerated, 5m)
| render timechart

// Query 2: Identify input bottlenecks
AzureMetrics
| where ResourceProvider == "MICROSOFT.STREAMANALYTICS"
| where MetricName in ("InputEvents", "OutputEvents")
| summarize
    InputRate = sumif(Total, MetricName == "InputEvents"),
    OutputRate = sumif(Total, MetricName == "OutputEvents")
    by bin(TimeGenerated, 1m)
| extend BacklogGrowth = InputRate - OutputRate
| where BacklogGrowth > 1000  // Alert when backlog grows
| render timechart

// Query 3: Data conversion error trends
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.STREAMANALYTICS"
| where Category == "Execution"
| where properties_s contains "conversion error"
| summarize ErrorCount = count() by bin(TimeGenerated, 5m)
| render timechart

🧪 Performance Testing

10.22 Load Testing Script

Generate high-volume test data:

# performance_load_test.py
from azure.eventhub import EventHubProducerClient, EventData
import asyncio
import json
import time
from datetime import datetime
import random
import os

connection_string = os.environ.get("STREAM_EH_SEND_CONN")
eventhub_name = os.environ.get("STREAM_EH_NAME")

async def send_batch(producer, batch_size, device_count):
    """Send a batch of events"""
    event_data_batch = await producer.create_batch()

    for i in range(batch_size):
        device_id = f"device-{random.randint(1, device_count):04d}"
        event = {
            "deviceId": device_id,
            "temperature": round(random.uniform(60, 85), 2),
            "humidity": round(random.uniform(30, 70), 2),
            "pressure": round(random.uniform(980, 1020), 2),
            "timestamp": datetime.utcnow().isoformat()
        }

        try:
            event_data_batch.add(EventData(json.dumps(event)))
        except ValueError:
            # Batch is full, send it and create new batch
            await producer.send_batch(event_data_batch)
            event_data_batch = await producer.create_batch()
            event_data_batch.add(EventData(json.dumps(event)))

    # Send remaining events
    if len(event_data_batch) > 0:
        await producer.send_batch(event_data_batch)

async def load_test(events_per_second, duration_seconds, device_count):
    """Run load test with specified parameters"""
    producer = EventHubProducerClient.from_connection_string(
        conn_str=connection_string,
        eventhub_name=eventhub_name
    )

    batch_size = events_per_second
    batches = duration_seconds

    print(f"Starting load test:")
    print(f"  - Target: {events_per_second} events/second")
    print(f"  - Duration: {duration_seconds} seconds")
    print(f"  - Devices: {device_count}")
    print(f"  - Total events: {events_per_second * duration_seconds}")

    start_time = time.time()

    async with producer:
        for batch_num in range(batches):
            batch_start = time.time()

            await send_batch(producer, batch_size, device_count)

            # Calculate sleep time to maintain rate
            elapsed = time.time() - batch_start
            sleep_time = max(0, 1.0 - elapsed)

            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

            if (batch_num + 1) % 10 == 0:
                print(f"Sent {(batch_num + 1) * batch_size} events...")

    total_time = time.time() - start_time
    total_events = batches * batch_size
    actual_rate = total_events / total_time

    print(f"\nLoad test complete:")
    print(f"  - Total time: {total_time:.2f} seconds")
    print(f"  - Total events: {total_events}")
    print(f"  - Actual rate: {actual_rate:.2f} events/second")

# Run load test
if __name__ == "__main__":
    asyncio.run(load_test(
        events_per_second=1000,  # Target throughput
        duration_seconds=300,     # 5 minutes
        device_count=100          # Number of unique devices
    ))

10.23 Monitor During Load Test

# Monitor metrics during load test
$startTime = (Get-Date).ToString("yyyy-MM-ddTHH:mm:ss")

# Run load test (in separate terminal)
python performance_load_test.py

# Monitor in real-time
while ($true) {
    Clear-Host
    Write-Host "=== Stream Analytics Performance Metrics ===" -ForegroundColor Cyan
    Write-Host "Time: $(Get-Date)" -ForegroundColor Green

    $jobResourceId = az stream-analytics job show `
        --resource-group $env:STREAM_RG `
        --name $env:STREAM_JOB `
        --query id -o tsv

    # Get latest metrics
    $suUtil = az monitor metrics list `
        --resource $jobResourceId `
        --metric "ResourceUtilization" `
        --start-time $startTime `
        --aggregation Average `
        --query "value[0].timeseries[0].data[-1].average" -o tsv

    $watermark = az monitor metrics list `
        --resource $jobResourceId `
        --metric "WatermarkDelay" `
        --start-time $startTime `
        --aggregation Maximum `
        --query "value[0].timeseries[0].data[-1].maximum" -o tsv

    $inputEvents = az monitor metrics list `
        --resource $jobResourceId `
        --metric "InputEvents" `
        --start-time $startTime `
        --aggregation Total `
        --interval PT1M `
        --query "value[0].timeseries[0].data[-1].total" -o tsv

    Write-Host "SU Utilization: $([Math]::Round($suUtil, 2))%" -ForegroundColor $(if($suUtil -gt 80){'Red'}else{'Green'})
    Write-Host "Watermark Delay: $([Math]::Round($watermark / 1000, 2)) seconds" -ForegroundColor $(if($watermark -gt 60000){'Red'}else{'Green'})
    Write-Host "Input Events (last minute): $inputEvents" -ForegroundColor Cyan

    Start-Sleep -Seconds 10
}

🎓 Key Concepts Learned

Performance Fundamentals

  • SU% Utilization: Primary metric for capacity planning
  • Watermark Delay: Indicates processing lag
  • Partitioning: Enables linear scalability

Optimization Techniques

  • Query patterns: Minimize subqueries, optimize JOINs
  • Windowing: Choose appropriate window size
  • Early filtering: Reduce data volume as early as possible

Cost Management

  • Right-sizing SUs: Balance performance and cost
  • Scheduled operations: Stop jobs during off-peak hours
  • Output optimization: Batch writes, reduce frequency

🚀 Next Steps

You've mastered performance optimization! Continue to the final tutorial:

Tutorial 11: Error Handling and Resilience →

In the next tutorial, you'll:

  • Implement comprehensive error handling
  • Configure dead letter queues
  • Build fault-tolerant streaming pipelines
  • Handle data quality issues

📚 Additional Resources

🔧 Troubleshooting

Issue: High SU% Despite Low Input Rate

Symptoms: SU utilization >90% with low event throughput

Solution:

-- Check for inefficient query patterns
-- Look for:
-- 1. Overly complex JOINs
-- 2. Large temporal windows
-- 3. Unnecessary subqueries

-- Simplify query example:
SELECT
    DeviceId,
    AVG(Temperature) AS AvgTemp
INTO [Output]
FROM [Input] TIMESTAMP BY EventTime
GROUP BY DeviceId, TumblingWindow(minute, 5)
-- Remove any unnecessary transformations

Issue: Increasing Watermark Delay

Symptoms: Watermark delay continuously grows

Solution:

# Scale up SUs
az stream-analytics transformation update `
    --resource-group $env:STREAM_RG `
    --job-name $env:STREAM_JOB `
    --name "Transformation" `
    --streaming-units 12  # Increase from current value

# Or enable partitioning in query
# Add PARTITION BY PartitionId to query

Issue: Output Throttling

Symptoms: Output events lower than expected

Solution:

# Increase output batch size and parallel writers
# For SQL Database output:
# Update output configuration to increase:
# - batchSize: 10000
# - maxWriterCount: 8

# For Blob Storage:
# Implement proper partitioning pattern
# pathPattern: "data/{date}/{time}/output.json"

💬 Feedback

Was this tutorial helpful?


Tutorial Progress: 10 of 11 complete | Next: Error Handling

Last Updated: January 2025