Skip to content

Event Hubs Integration with Stream Analytics

Home | Implementation | Integration | EventHub + Stream Analytics

Status Complexity

Real-time analytics on Event Hub streams using Azure Stream Analytics.


Overview

Azure Stream Analytics provides SQL-based stream processing with built-in windowing, joins, and machine learning capabilities.


Implementation

Step 1: Create Stream Analytics Job

# Azure CLI
az stream-analytics job create \
    --resource-group rg-analytics \
    --name asa-iot-processing \
    --location eastus \
    --sku Standard \
    --output-error-policy Drop

Step 2: Configure Input

{
    "name": "iot-events-input",
    "properties": {
        "type": "Stream",
        "datasource": {
            "type": "Microsoft.EventHub/EventHub",
            "properties": {
                "serviceBusNamespace": "myhub",
                "eventHubName": "iot-events",
                "consumerGroupName": "asa-consumer",
                "sharedAccessPolicyName": "listen",
                "sharedAccessPolicyKey": "xxx"
            }
        },
        "serialization": {
            "type": "Json",
            "properties": {
                "encoding": "UTF8"
            }
        }
    }
}

Step 3: Stream Analytics Queries

-- Basic aggregation with tumbling window
SELECT
    device_id,
    System.Timestamp() AS window_end,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp,
    COUNT(*) AS reading_count
INTO [output-aggregates]
FROM [iot-events-input]
GROUP BY device_id, TumblingWindow(minute, 5)

-- Sliding window for continuous monitoring
SELECT
    device_id,
    System.Timestamp() AS event_time,
    AVG(temperature) OVER (PARTITION BY device_id LIMIT DURATION(minute, 10)) AS rolling_avg
INTO [output-rolling]
FROM [iot-events-input]

-- Anomaly detection
SELECT
    device_id,
    temperature,
    humidity,
    System.Timestamp() AS detected_at,
    'TEMPERATURE_SPIKE' AS anomaly_type
INTO [output-anomalies]
FROM [iot-events-input]
WHERE temperature > 100 OR temperature < -40

-- Session windows for activity tracking
SELECT
    user_id,
    MIN(event_time) AS session_start,
    MAX(event_time) AS session_end,
    COUNT(*) AS event_count,
    DATEDIFF(second, MIN(event_time), MAX(event_time)) AS session_duration_sec
INTO [output-sessions]
FROM [clickstream-input]
GROUP BY user_id, SessionWindow(event_time, INTERVAL 30 MINUTE, INTERVAL 2 HOUR)

Step 4: Reference Data Join

-- Join stream with reference data
SELECT
    e.device_id,
    e.temperature,
    d.device_name,
    d.location,
    d.threshold_high,
    d.threshold_low,
    System.Timestamp() AS event_time,
    CASE
        WHEN e.temperature > d.threshold_high THEN 'HIGH'
        WHEN e.temperature < d.threshold_low THEN 'LOW'
        ELSE 'NORMAL'
    END AS status
INTO [output-enriched]
FROM [iot-events-input] e
JOIN [device-reference] d ON e.device_id = d.device_id

Step 5: Temporal Joins

-- Join two streams within time window
SELECT
    o.order_id,
    o.customer_id,
    o.order_time,
    p.payment_id,
    p.payment_time,
    DATEDIFF(second, o.order_time, p.payment_time) AS time_to_payment
INTO [output-orders-payments]
FROM [orders-input] o
JOIN [payments-input] p
    ON o.order_id = p.order_id
    AND DATEDIFF(minute, o, p) BETWEEN 0 AND 30

Step 6: Output to Multiple Destinations

-- Output to Power BI for real-time dashboard
SELECT
    device_id,
    AVG(temperature) AS avg_temp,
    COUNT(*) AS count,
    System.Timestamp() AS time
INTO [powerbi-output]
FROM [iot-events-input]
GROUP BY device_id, TumblingWindow(second, 10)

-- Output aggregates to Synapse
SELECT
    device_id,
    DATEPART(year, System.Timestamp()) AS year,
    DATEPART(month, System.Timestamp()) AS month,
    DATEPART(day, System.Timestamp()) AS day,
    DATEPART(hour, System.Timestamp()) AS hour,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    COUNT(*) AS reading_count
INTO [synapse-output]
FROM [iot-events-input]
GROUP BY device_id, TumblingWindow(hour, 1)

-- Output anomalies to Event Hub for downstream processing
SELECT
    device_id,
    temperature,
    'ANOMALY' AS event_type,
    System.Timestamp() AS detected_at
INTO [eventhub-alerts-output]
FROM [iot-events-input]
WHERE temperature > 100

Built-in ML Functions

-- Anomaly detection with built-in ML
WITH AnomalyDetection AS (
    SELECT
        device_id,
        temperature,
        AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips') OVER (
            PARTITION BY device_id
            LIMIT DURATION(minute, 10)
        ) AS anomaly_result
    FROM [iot-events-input]
)
SELECT
    device_id,
    temperature,
    anomaly_result.IsAnomaly AS is_anomaly,
    anomaly_result.Score AS anomaly_score
INTO [output-ml-anomalies]
FROM AnomalyDetection
WHERE anomaly_result.IsAnomaly = 1

Monitoring Query

-- Create diagnostic metrics
SELECT
    'StreamAnalyticsMetrics' AS metric_type,
    COUNT(*) AS events_processed,
    System.Timestamp() AS window_time
INTO [metrics-output]
FROM [iot-events-input]
GROUP BY TumblingWindow(minute, 1)

Configuration

Scaling

Streaming Units Throughput Use Case
1-3 Up to 1 MB/s Development/Test
6-12 Up to 6 MB/s Production workloads
24+ 12+ MB/s High-volume production


Last Updated: January 2025