Skip to content

Source: examples/iot-streaming/README.md — this page is rendered live from that file.

IoT & Streaming Analytics Examples

Examples > IoT Streaming

[!TIP] TL;DR — Reusable streaming patterns shared across verticals: hot path (Event Hub to ADX), warm path (Stream Analytics windowed aggregations), cold path (Event Hub Capture to ADLS), and anomaly detection for temperature, AQI, weather, and slot machine sensors.


📋 Table of Contents

Real-time data ingestion and analytics patterns for IoT sensors, telemetry, and event streaming. These patterns are used across multiple verticals (NOAA weather stations, EPA air quality sensors, casino slot machines).


Architecture

graph TB
    subgraph Sources["Data Sources"]
        IoT["IoT Sensors<br/>(temp, humidity, pressure)"]
        Weather["Weather Stations<br/>(NOAA-style)"]
        AQI["AQI Monitors<br/>(EPA-style)"]
        Slots["Casino Slot<br/>Machines"]
    end

    subgraph Ingestion["Ingestion Layer"]
        IoTHub["Azure IoT Hub<br/>+ DPS"]
        EH["Event Hubs<br/>Namespace"]
    end

    subgraph Processing["Processing"]
        direction TB
        Hot["Hot Path<br/>Azure Data Explorer<br/>(sub-second KQL)"]
        Warm["Warm Path<br/>Stream Analytics<br/>(windowed aggregation)"]
        Cold["Cold Path<br/>Event Hub Capture<br/>(ADLS Gen2 Parquet)"]
    end

    subgraph Storage["Storage"]
        Bronze["ADLS Gen2<br/>Bronze (raw)"]
        Silver["ADLS Gen2<br/>Silver (enriched)"]
        Gold["ADLS Gen2<br/>Gold (aggregated)"]
        ADX["ADX Tables<br/>(real-time)"]
    end

    subgraph Consumers["Consumers"]
        PBI["Power BI<br/>Dashboards"]
        Alerts["Teams / Email<br/>Alerts"]
        API["REST APIs"]
    end

    IoT --> IoTHub
    Weather --> IoTHub
    AQI --> EH
    Slots --> EH
    IoTHub --> EH

    EH --> Hot
    EH --> Warm
    EH --> Cold

    Hot --> ADX
    Warm --> Silver
    Warm --> ADX
    Cold --> Bronze
    Bronze --> Silver
    Silver --> Gold

    ADX --> PBI
    Gold --> PBI
    Warm --> Alerts
    ADX --> API

Streaming Patterns

Pattern 1: Hot Path (Real-Time)

Event Hub → Azure Data Explorer for sub-second query latency:

// Real-time slot machine events (last 5 minutes)
SlotEvents
| where timestamp > ago(5m)
| summarize
    total_spins = count(),
    total_coin_in = sum(coin_in),
    total_coin_out = sum(coin_out),
    hold_pct = round((sum(coin_in) - sum(coin_out)) / sum(coin_in) * 100, 2)
  by bin(timestamp, 1m), floor_zone
| render timechart

Pattern 2: Warm Path (Near Real-Time)

Event Hub → Stream Analytics → Power BI for aggregated dashboards:

-- Stream Analytics query: 5-minute windowed aggregation
SELECT
    System.Timestamp() AS window_end,
    sensor_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp,
    COUNT(*) AS reading_count
INTO [PowerBIOutput]
FROM [EventHubInput]
TIMESTAMP BY event_time
GROUP BY
    sensor_id,
    TumblingWindow(minute, 5)

Pattern 3: Cold Path (Batch)

Event Hub Capture → ADLS Gen2 → dbt/Databricks for historical analytics:

# Event Hub Capture configuration
capture:
  enabled: true
  encoding: Avro
  intervalInSeconds: 300
  sizeLimitInBytes: 314572800
  destination:
    name: EventHubArchive.AzureBlockBlob
    storageAccountResourceId: /subscriptions/.../storageAccounts/csastor
    blobContainer: bronze
    archiveNameFormat: "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"

Pattern 4: Anomaly Detection

Stream Analytics anomaly detection on streaming data:

-- Detect anomalies in AQI readings
SELECT
    sensor_id,
    event_time,
    aqi_value,
    AnomalyDetection_SpikeAndDip(aqi_value, 95, 120, 'spikesanddips')
      OVER (PARTITION BY sensor_id LIMIT DURATION(minute, 120)) AS anomaly_score
INTO [AlertOutput]
FROM [AQIInput]
WHERE anomaly_score > 0.8

📁 Directory Structure

examples/iot-streaming/
├── README.md                          # This file
├── producers/
│   └── iot_simulator.py               # Multi-type IoT sensor simulator (streaming)
├── data/
│   ├── generators/                    # Deterministic batch-fixture generators
│   │   ├── generate_telemetry.py      # Telemetry/weather/AQI/slots CSVs
│   │   ├── generate_devices.py        # Rebuilds dbt device seeds
│   │   ├── README.md                  # Generator usage + reproducibility
│   │   └── tests/test_generators.py   # Determinism + row-count tests
│   └── seed/                          # Generator output (bronze-layer CSVs)
├── domains/
│   └── dbt/                           # dbt medallion (bronze/silver/gold)
│       ├── dbt_project.yml
│       ├── models/
│       │   ├── schema.yml
│       │   ├── bronze/brz_*.sql
│       │   ├── silver/slv_*.sql
│       │   └── gold/gld_*.sql
│       └── seeds/                     # devices.csv, sensor_metadata.csv
├── deploy/
│   └── bicep/
│       ├── iot-hub.bicep              # IoT Hub + DPS + Event Hubs
│       └── stream-analytics.bicep     # Stream Analytics job
├── kql/
│   ├── tables.kql                     # ADX table + mapping definitions
│   └── queries/
│       ├── realtime_anomaly_detection.kql  # Time-series anomaly detection
│       ├── hourly_aggregation.kql          # Hourly rollups by device
│       ├── device_health.kql               # Connectivity & battery monitoring
│       ├── alert_triggers.kql              # Threshold-based alerting
│       └── dashboard_summary.kql           # Top-level KPI queries
└── stream-analytics/
    ├── transform_telemetry.asaql      # Parse & enrich raw telemetry
    ├── aggregate_metrics.asaql        # Tumbling & hopping window aggregation
    └── detect_anomalies.asaql         # Spike/dip & change-point detection

📦 Deployment

📎 Prerequisites

  • Azure subscription with Event Hubs, IoT Hub, and ADX resource providers registered
  • ADLS Gen2 storage account for Event Hub Capture and processed output
  • Log Analytics workspace for diagnostic settings
  • Azure CLI with Bicep installed

⚡ Step 1: Deploy IoT Hub and Event Hubs

[!IMPORTANT] Entra-only (CSA-0025 / AQ-0014). IoT Hub and DPS are deployed with disableLocalAuth: true. SAS-key device authentication is disabled. Legacy SAS clients must migrate before deploying this template — see docs/migrations/iot-hub-entra.md.

# Create resource group
az group create --name rg-iot-streaming --location eastus

# Deploy IoT Hub + Event Hubs
az deployment group create \
  --resource-group rg-iot-streaming \
  --template-file deploy/bicep/iot-hub.bicep \
  --parameters \
      baseName=csaiot \
      captureStorageAccountName=<your-adls-account> \
      logAnalyticsWorkspaceId=<your-law-id>

# Post-deploy: establish identity-based DPS → IoT Hub link.
# (Required because the ARM DPS schema still requires a SAS connection
# string for inline linking — we defer linking to the CLI so the DPS
# managed identity can be used. See docs/migrations/iot-hub-entra.md.)
IOT_HUB_ID=$(az deployment group show \
  --resource-group rg-iot-streaming --name iot-hub \
  --query properties.outputs.iotHubResourceId.value -o tsv)
DPS_NAME=$(az deployment group show \
  --resource-group rg-iot-streaming --name iot-hub \
  --query properties.outputs.dpsName.value -o tsv)
az iot dps linked-hub create \
  --dps-name "$DPS_NAME" --resource-group rg-iot-streaming \
  --hub-resource-id "$IOT_HUB_ID" \
  --allocation-weight 1 \
  --authentication-type identityBased

This deploys: - IoT Hub (S1) with system-assigned managed identity, Entra-only auth (disableLocalAuth: true), no SAS authorization policies, and identity-based routing to the Event Hub telemetry hub. - Device Provisioning Service (S1) with system-assigned managed identity. DPS is deployed unlinked; the IoT Hub link is established post-deploy as identity-based (no SAS connection string is ever emitted). - Event Hub Namespace (Standard, auto-inflate) with three hubs: - telemetry — raw device data with Capture to ADLS - alerts — anomaly and threshold alerts - processed — enriched/aggregated output - Consumer groups for ADX, Stream Analytics, and Capture - Role assignments: DPS MI → IoT Hub Data Contributor; IoT Hub MI → Event Hubs Data Sender (required for identity-based routing). - Diagnostic settings to Log Analytics

🔄 Step 2: Deploy Stream Analytics

# Get the Event Hub connection string from Step 1 output
EH_CONN=$(az deployment group show \
  --resource-group rg-iot-streaming \
  --name iot-hub \
  --query properties.outputs.telemetryHubListenConnectionString.value -o tsv)

# Deploy Stream Analytics job
az deployment group create \
  --resource-group rg-iot-streaming \
  --template-file deploy/bicep/stream-analytics.bicep \
  --parameters \
      baseName=csaiot \
      eventHubNamespaceName=csaiot-ehns \
      eventHubConnectionString="$EH_CONN" \
      adlsAccountName=<your-adls-account> \
      adlsAccountKey=<your-adls-key> \
      logAnalyticsWorkspaceId=<your-law-id>

🗄️ Step 3: Create ADX Tables

# Connect to your ADX cluster
az kusto query \
  --cluster-name <adx-cluster> \
  --database-name realtime \
  --query "$(cat kql/tables.kql)"

# Or using the Kusto CLI
kusto query -database realtime -script kql/tables.kql

Step 4: Start the Simulator

# Install dependencies
pip install azure-eventhub

# Run the IoT simulator (stdout mode for testing)
python producers/iot_simulator.py \
  --sensor-type temperature \
  --sensor-count 10 \
  --interval 5 \
  --max-events 1000

# Run with Event Hub output
python producers/iot_simulator.py \
  --connection-string "$EVENTHUB_CONNECTION_STRING" \
  --event-hub-name telemetry \
  --sensor-type temperature \
  --sensor-count 10 \
  --interval 5

Available sensor types: - temperature — Temperature, humidity, pressure sensors - aqi — EPA-style air quality index sensors - weather — NOAA-style weather station readings - slot_machine — Casino slot machine telemetry

🔄 Step 5: Start Stream Analytics Job

az stream-analytics job start \
  --resource-group rg-iot-streaming \
  --name csaiot-asa \
  --output-start-mode Now

Step 6: Query Real-Time Data in ADX

// Fleet overview
SensorTelemetry
| where timestamp > ago(15m)
| summarize
    count(),
    avg(temperature_c),
    dcount(sensor_id)
  by bin(timestamp, 1m)
| render timechart

// Anomaly detection
// See kql/queries/realtime_anomaly_detection.kql

🗄️ KQL Queries Reference

Query File Purpose
realtime_anomaly_detection.kql Time-series decomposition, IQR outliers, correlation anomalies
hourly_aggregation.kql Hourly rollups for all sensor types (temp, weather, AQI, slots)
device_health.kql Connectivity status, battery monitoring, data freshness, quality checks
alert_triggers.kql Temperature, AQI, wind, battery, hold%, and offline device alerts
dashboard_summary.kql Fleet KPIs, geographic heatmap, throughput metrics, per-vertical summaries

🔄 Stream Analytics Queries Reference

Query File Purpose
transform_telemetry.asaql Raw passthrough + enrichment (heat index, dew point, quality flags)
aggregate_metrics.asaql Tumbling (5min), hopping (1min/5min), regional, and session windows
detect_anomalies.asaql SpikeAndDip, ChangePoint, and combined threshold+anomaly alerts

🧱 dbt + Data Generators (Batch / Cold Path)

The streaming examples above cover the hot and warm paths. For the cold path (Event Hub Capture → ADLS → dbt medallion) this vertical ships a dbt project at domains/dbt/ and deterministic seed generators at data/generators/.

Responsibility split:

Path Technology Owns
Hot ADX (KQL) Sub-second dashboards, live alerts
Warm Stream Analytics 1–5 min windowed aggregates, spike/dip anomaly detection
Cold dbt (this project) Historical medallion (bronze/silver/gold), SLO reporting, analytics-ready marts

The dbt anomaly logic (slv_anomaly_flags) intentionally mirrors the ASA detect_anomalies.asaql thresholds so warm-path and cold-path signals stay comparable.

Generate seed data

# 7 days of telemetry across 10 IoT devices (plus weather, AQI, slots)
python examples/iot-streaming/data/generators/generate_telemetry.py --days 7

# See all options
python examples/iot-streaming/data/generators/generate_telemetry.py --help

Output lands in examples/iot-streaming/data/seed/:

  • telemetry_bronze.csv — long format (device_id, event_time, metric_type, value)
  • weather_bronze.csv — NOAA-style weather observations
  • aqi_bronze.csv — EPA-style AQI readings
  • slots_bronze.csv — casino slot-machine events

Generators are deterministic — the same --seed (default 42) produces byte-identical output. The test suite enforces this with sha256.

Run dbt

cd examples/iot-streaming/domains/dbt
dbt seed          # load devices.csv + sensor_metadata.csv
dbt run           # build bronze → silver → gold
dbt test          # column-level not_null / unique / accepted_values

Medallion output

bronze/
  brz_iot_telemetry          — raw sensor readings (long format)
  brz_weather_observations   — NOAA-style weather
  brz_aqi_readings           — EPA-style AQI
  brz_slot_machine_events    — casino telemetry

silver/
  slv_device_telemetry_cleaned   — deduped, range-validated, UTC-normalized
  slv_anomaly_flags              — z-score + IQR + threshold anomaly flags
  slv_sensor_aggregates_1min     — 1-minute tumbling windows per device+metric

gold/
  gld_device_health_daily        — daily uptime %, data completeness, alert counts
  gld_anomaly_heatmap            — hour × metric-type anomaly density
  gld_sla_breach_summary         — 15-minute latency-SLO compliance roll-up

Run the generator tests

python -m pytest examples/iot-streaming/data/generators/tests/ -v

Integration with Other Verticals

This streaming infrastructure is shared across verticals:

Vertical Sensor Type Event Hub ADX Table
NOAA Weather weather telemetry WeatherObservations
EPA Air Quality aqi telemetry AQIReadings
Casino Analytics slot_machine telemetry SlotEvents
Generic IoT temperature telemetry SensorTelemetry

Each vertical can add its own KQL queries and Stream Analytics jobs while sharing the same Event Hub namespace and ADX cluster.


🔒 Azure Government

All streaming services are available in Azure Government: - Event Hubs: GA (FedRAMP High, IL4, IL5) - Azure Data Explorer: GA (FedRAMP High, IL4, IL5) - Stream Analytics: GA (FedRAMP High, IL4, IL5) - IoT Hub: GA (FedRAMP High, IL4, IL5) - ADLS Gen2 (Capture): GA (FedRAMP High, IL4, IL5)

Use the Government parameter files in deploy/bicep/gov/ and set your Azure CLI cloud to AzureUSGovernment.



Prerequisites / Cost / Teardown

[!IMPORTANT] Cost-safety: this vertical deploys real Azure resources. Always run teardown.sh when you are done. A forgotten workshop environment can run $80-150/day.

Prerequisites

  • Azure CLI 2.50+ logged in (az login), subscription selected (az account set --subscription <id>)
  • jq installed (used by teardown enumeration)
  • Bicep CLI 0.25+ (az bicep version)
  • Contributor + User Access Administrator on target subscription (or a pre-created RG with equivalent RBAC)
  • bash scripts/deploy/validate-prerequisites.sh passes

Cost estimate (rough, East US 2)

  • While running: ~$$80-150/day (services: IoT Hub, Event Hub, Stream Analytics, ADX, Storage)
  • Idle overnight: roughly half if you stop compute (Databricks autostop + Synapse pause)
  • Storage + Key Vault residual: <$5/month if you skip teardown

Numbers are indicative for a small demo dataset; production workloads vary significantly. Use az consumption usage list or Cost Management for live numbers.

Runtime

  • Deploy: ~20-30 minutes (first run; cold Bicep)
  • Teardown: ~5-10 minutes (async RG delete completes in the background)

Teardown

When finished, run the per-example teardown script. It enforces a typed DESTROY-iot-streaming confirmation, logs every step to reports/teardown/iot-streaming-<timestamp>.log, and deletes the resource group rg-iot-streaming along with any matching subscription-scope deployments.

# Interactive (recommended)
bash examples/iot-streaming/deploy/teardown.sh

# Dry run (enumerate only)
bash examples/iot-streaming/deploy/teardown.sh --dry-run

# From the repo root via Makefile
make teardown-example VERTICAL=iot-streaming
make teardown-example VERTICAL=iot-streaming DRYRUN=1

# CI automation (no prompt — only for ephemeral environments)
bash examples/iot-streaming/deploy/teardown.sh --yes

See docs/QUICKSTART.md#teardown for the platform-wide teardown flow.

Expected Results

After running the medallion pipeline against the bundled seed data, the Gold layer should populate the following tables. Row counts vary with the seed-data generator parameters; the figures below are the approximate scale you should see on a default run.

Gold Table Approximate Rows Notes
gld_anomaly_heatmap TODO: capture after first run Populated from Silver via dbt --select tag:gold
gld_device_health_daily TODO: capture after first run Populated from Silver via dbt --select tag:gold
gld_sla_breach_summary TODO: capture after first run Populated from Silver via dbt --select tag:gold

TODO: capture exact counts after the next end-to-end seed run. These are bounded by the seed-data generator parameters in data/generators/.