Stream Analytics Query Development¶
Home | Best Practices | Cross-Cutting | Stream Analytics Queries
Best practices for developing Azure Stream Analytics queries.
Query Fundamentals¶
Basic Structure¶
-- Standard query pattern
SELECT
System.Timestamp() AS EventTime,
DeviceId,
AVG(Temperature) AS AvgTemperature,
COUNT(*) AS EventCount
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
GROUP BY
DeviceId,
TumblingWindow(minute, 5)
Window Types¶
| Window Type | Use Case | Syntax |
|---|---|---|
| Tumbling | Fixed, non-overlapping | TumblingWindow(minute, 5) |
| Hopping | Fixed, overlapping | HoppingWindow(minute, 10, 5) |
| Sliding | Event-driven | SlidingWindow(minute, 5) |
| Session | Activity-based gaps | SessionWindow(minute, 5, 30) |
Performance Optimization¶
Partition Optimization¶
-- Use PARTITION BY for parallelism
SELECT
DeviceId,
AVG(Temperature) AS AvgTemp
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
PARTITION BY DeviceId
GROUP BY
DeviceId,
TumblingWindow(minute, 1)
Efficient Joins¶
-- Reference data join (small, static data)
SELECT
i.DeviceId,
i.Temperature,
r.DeviceLocation,
r.Threshold
FROM
IoTHubInput i TIMESTAMP BY EventEnqueuedUtcTime
JOIN
ReferenceData r ON i.DeviceId = r.DeviceId
-- Stream-to-stream join
SELECT
a.DeviceId,
a.Temperature,
b.Humidity
FROM
TempStream a TIMESTAMP BY a.EventTime
JOIN
HumidityStream b TIMESTAMP BY b.EventTime
ON
a.DeviceId = b.DeviceId
AND DATEDIFF(second, a, b) BETWEEN -5 AND 5
Pattern Detection¶
Anomaly Detection¶
-- Built-in anomaly detection
SELECT
DeviceId,
EventTime,
Temperature,
AnomalyDetection_SpikeAndDip(Temperature, 95, 120, 'spikesanddips')
OVER(PARTITION BY DeviceId LIMIT DURATION(minute, 10)) AS AnomalyResult
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
Trend Analysis¶
-- Detect increasing trends
SELECT
DeviceId,
System.Timestamp() AS WindowEnd,
AVG(Temperature) AS AvgTemp,
LAG(AVG(Temperature), 1) OVER (
PARTITION BY DeviceId
LIMIT DURATION(minute, 10)
) AS PrevAvgTemp
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
GROUP BY
DeviceId,
TumblingWindow(minute, 5)
HAVING
AVG(Temperature) > LAG(AVG(Temperature), 1) OVER (
PARTITION BY DeviceId
LIMIT DURATION(minute, 10)
) * 1.1 -- 10% increase
Error Handling¶
Late Arrival Policy¶
-- Configure late arrival tolerance
-- Set in job configuration, not query
-- Late arrival: up to 5 minutes
-- Out of order: up to 10 seconds
-- Handle late events in query
SELECT
DeviceId,
System.Timestamp() AS ProcessedTime,
EventEnqueuedUtcTime AS OriginalTime,
DATEDIFF(second, EventEnqueuedUtcTime, System.Timestamp()) AS LatencySeconds
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
Null Handling¶
-- Safe null handling
SELECT
DeviceId,
COALESCE(Temperature, 0) AS Temperature,
CASE
WHEN Temperature IS NULL THEN 'MISSING'
WHEN Temperature > 100 THEN 'HIGH'
WHEN Temperature < 0 THEN 'LOW'
ELSE 'NORMAL'
END AS Status
FROM
IoTHubInput
WHERE
DeviceId IS NOT NULL
Output Patterns¶
Multiple Outputs¶
-- Hot path: Real-time alerts
SELECT
DeviceId,
Temperature,
'ALERT' AS Type
INTO
AlertOutput
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
WHERE
Temperature > 100
-- Warm path: Aggregated metrics
SELECT
DeviceId,
AVG(Temperature) AS AvgTemp,
MAX(Temperature) AS MaxTemp,
MIN(Temperature) AS MinTemp
INTO
MetricsOutput
FROM
IoTHubInput TIMESTAMP BY EventEnqueuedUtcTime
GROUP BY
DeviceId,
TumblingWindow(minute, 5)
-- Cold path: All events for archive
SELECT *
INTO
ArchiveOutput
FROM
IoTHubInput
Testing¶
Local Testing¶
# Test with sample data
az stream-analytics job start \
--resource-group rg-streaming \
--name asa-iot-processing \
--output-start-mode CustomTime \
--output-start-time "2024-01-15T00:00:00Z"
Query Validation¶
-- Validate query logic with sample data
WITH TestData AS (
SELECT * FROM (
VALUES
('device1', 75.5, '2024-01-15T10:00:00Z'),
('device1', 85.0, '2024-01-15T10:01:00Z'),
('device1', 105.0, '2024-01-15T10:02:00Z')
) AS t(DeviceId, Temperature, EventTime)
)
SELECT
DeviceId,
AVG(Temperature) AS AvgTemp
FROM TestData
GROUP BY DeviceId
Related Documentation¶
Last Updated: January 2025