🚨 Tutorial 7: Anomaly Detection¶
Implement machine learning-powered anomaly detection using Stream Analytics built-in functions. Detect spikes, dips, change points, and custom anomalies in real-time streaming data.
🎯 Learning Objectives¶
After completing this tutorial, you will be able to:
- ✅ Use AnomalyDetection_SpikeAndDip for sudden value changes
- ✅ Implement AnomalyDetection_ChangePoint for trend shifts
- ✅ Configure confidence levels and sensitivity parameters
- ✅ Build custom anomaly detection logic with statistical methods
- ✅ Create multi-layered detection strategies
- ✅ Generate actionable alerts from anomalies
⏱️ Time Estimate: 35 minutes¶
- Spike and Dip Detection: 12 minutes
- Change Point Detection: 12 minutes
- Custom Anomaly Logic: 11 minutes
📋 Prerequisites¶
- Completed Tutorial 06: Joins and Temporal Operations
- Data generator producing varied patterns
- Understanding of statistical concepts (mean, standard deviation)
📈 Step 1: Spike and Dip Detection¶
1.1 Basic Spike Detection¶
Detect sudden increases in metric values:
-- Detect temperature spikes using built-in ML function
SELECT
deviceId,
location,
timestamp,
temperature,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikes') OVER(LIMIT DURATION(minute, 10)) AS RECORD) AS spikeScore
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
Parameters Explained: - 95: Confidence level (95% confidence) - 120: History window size (120 events) - 'spikes': Detect only spikes (can be 'dips' or 'spikesanddips')
1.2 Extract Anomaly Details¶
Parse the anomaly detection output:
-- Extract spike detection details
WITH SpikeDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikes')
OVER(LIMIT DURATION(minute, 10)) AS RECORD) AS spikeResult
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
spikeResult.IsAnomaly AS isSpike,
spikeResult.Score AS anomalyScore,
spikeResult.PValue AS pValue,
CASE
WHEN spikeResult.IsAnomaly = 1 AND spikeResult.Score > 0.8 THEN 'Critical Spike'
WHEN spikeResult.IsAnomaly = 1 AND spikeResult.Score > 0.5 THEN 'Warning Spike'
WHEN spikeResult.IsAnomaly = 1 THEN 'Minor Spike'
ELSE 'Normal'
END AS severityLevel
INTO
SqlOutput
FROM
SpikeDetection
WHERE
spikeResult.IsAnomaly = 1; -- Only output anomalies
1.3 Detect Both Spikes and Dips¶
Monitor for bidirectional anomalies:
-- Detect temperature spikes AND dips
WITH BidirectionalDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
vibration,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS tempAnomaly,
CAST(AnomalyDetection_SpikeAndDip(humidity, 90, 100, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS humidityAnomaly,
CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS vibrationAnomaly
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
vibration,
-- Temperature anomaly
tempAnomaly.IsAnomaly AS isTempAnomaly,
tempAnomaly.Score AS tempAnomalyScore,
-- Humidity anomaly
humidityAnomaly.IsAnomaly AS isHumidityAnomaly,
humidityAnomaly.Score AS humidityAnomalyScore,
-- Vibration anomaly
vibrationAnomaly.IsAnomaly AS isVibrationAnomaly,
vibrationAnomaly.Score AS vibrationAnomalyScore,
-- Composite severity
CASE
WHEN (tempAnomaly.IsAnomaly + humidityAnomaly.IsAnomaly + vibrationAnomaly.IsAnomaly) >= 2 THEN 'Critical - Multiple Metrics'
WHEN tempAnomaly.IsAnomaly = 1 OR vibrationAnomaly.IsAnomaly = 1 THEN 'High - Safety Concern'
WHEN humidityAnomaly.IsAnomaly = 1 THEN 'Medium - Environmental'
ELSE 'Normal'
END AS overallSeverity,
-- Alert message
CONCAT(
CASE WHEN tempAnomaly.IsAnomaly = 1 THEN 'Temperature ' ELSE '' END,
CASE WHEN humidityAnomaly.IsAnomaly = 1 THEN 'Humidity ' ELSE '' END,
CASE WHEN vibrationAnomaly.IsAnomaly = 1 THEN 'Vibration ' ELSE '' END,
'Anomaly Detected'
) AS alertMessage
INTO
BlobOutput
FROM
BidirectionalDetection
WHERE
tempAnomaly.IsAnomaly = 1
OR humidityAnomaly.IsAnomaly = 1
OR vibrationAnomaly.IsAnomaly = 1;
📉 Step 2: Change Point Detection¶
2.1 Basic Change Point Detection¶
Identify when data patterns fundamentally shift:
-- Detect change points in temperature trends
SELECT
deviceId,
location,
timestamp,
temperature,
CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS changePoint
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
Parameters: - 80: Confidence level (80%) - 120: History window size
2.2 Extract Change Point Details¶
Get detailed change point information:
-- Detailed change point analysis
WITH ChangePointAnalysis AS (
SELECT
deviceId,
location,
timestamp,
temperature,
CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 2)) AS RECORD) AS changePointResult
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
changePointResult.IsAnomaly AS isChangePoint,
changePointResult.Score AS confidenceScore,
changePointResult.PValue AS statisticalSignificance,
CASE
WHEN changePointResult.IsAnomaly = 1 AND changePointResult.Score > 0.9 THEN 'Strong Trend Change'
WHEN changePointResult.IsAnomaly = 1 AND changePointResult.Score > 0.7 THEN 'Moderate Trend Change'
WHEN changePointResult.IsAnomaly = 1 THEN 'Weak Trend Change'
ELSE 'Stable Trend'
END AS trendStatus,
-- Calculate temperature trend
temperature - LAG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 30)) AS recentTempChange,
-- Alert type
CASE
WHEN changePointResult.IsAnomaly = 1 THEN 'Trend Change Alert'
ELSE NULL
END AS alertType
INTO
SqlOutput
FROM
ChangePointAnalysis
WHERE
changePointResult.IsAnomaly = 1;
2.3 Multi-Metric Change Point Detection¶
Monitor trend changes across multiple metrics:
-- Monitor change points in temperature, humidity, and vibration
WITH MultiMetricChangePoints AS (
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
vibration,
CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS tempCP,
CAST(AnomalyDetection_ChangePoint(humidity, 75, 120)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS humidityCP,
CAST(AnomalyDetection_ChangePoint(vibration, 85, 120)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS vibrationCP
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
-- Change point indicators
tempCP.IsAnomaly AS tempTrendChange,
humidityCP.IsAnomaly AS humidityTrendChange,
vibrationCP.IsAnomaly AS vibrationTrendChange,
-- Confidence scores
tempCP.Score AS tempConfidence,
humidityCP.Score AS humidityConfidence,
vibrationCP.Score AS vibrationConfidence,
-- Combined analysis
CASE
WHEN tempCP.IsAnomaly = 1 AND vibrationCP.IsAnomaly = 1 THEN 'Equipment State Change'
WHEN tempCP.IsAnomaly = 1 AND humidityCP.IsAnomaly = 1 THEN 'Environmental Shift'
WHEN vibrationCP.IsAnomaly = 1 THEN 'Mechanical Change'
WHEN tempCP.IsAnomaly = 1 THEN 'Thermal Trend Shift'
WHEN humidityCP.IsAnomaly = 1 THEN 'Humidity Pattern Change'
ELSE 'Unknown Change'
END AS changeCategory,
System.Timestamp() AS detectionTime
INTO
BlobOutput
FROM
MultiMetricChangePoints
WHERE
tempCP.IsAnomaly = 1
OR humidityCP.IsAnomaly = 1
OR vibrationCP.IsAnomaly = 1;
🔍 Step 3: Custom Anomaly Detection¶
3.1 Statistical Threshold Detection¶
Implement Z-score based anomaly detection:
-- Custom Z-score anomaly detection
WITH StatisticalBaseline AS (
SELECT
deviceId,
location,
timestamp,
temperature,
AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS avgTemp,
STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS stdDevTemp
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
avgTemp AS baselineAverage,
stdDevTemp AS baselineStdDev,
-- Calculate Z-score
CASE
WHEN stdDevTemp > 0 THEN (temperature - avgTemp) / stdDevTemp
ELSE 0
END AS zScore,
-- Classify anomaly
CASE
WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 3 THEN 'Extreme Anomaly'
WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 2 THEN 'Moderate Anomaly'
WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 1.5 THEN 'Minor Deviation'
ELSE 'Normal'
END AS anomalyClassification,
-- Anomaly flag
CASE
WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 2 THEN 1
ELSE 0
END AS isAnomaly
INTO
SqlOutput
FROM
StatisticalBaseline
WHERE
stdDevTemp > 0
AND ABS((temperature - avgTemp) / stdDevTemp) > 2;
3.2 Percentile-Based Detection¶
Use percentiles to identify outliers:
-- Percentile-based anomaly detection
WITH PercentileAnalysis AS (
SELECT
deviceId,
location,
timestamp,
temperature,
PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY temperature)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS p05,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY temperature)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS p95,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY temperature)
OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS median
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
median AS medianTemperature,
p05 AS lowerBound,
p95 AS upperBound,
CASE
WHEN temperature > p95 THEN 'High Outlier'
WHEN temperature < p05 THEN 'Low Outlier'
ELSE 'Within Range'
END AS outlierStatus,
CASE
WHEN temperature > p95 THEN temperature - p95
WHEN temperature < p05 THEN p05 - temperature
ELSE 0
END AS deviationFromRange
INTO
BlobOutput
FROM
PercentileAnalysis
WHERE
temperature > p95 OR temperature < p05;
3.3 Rate of Change Detection¶
Detect rapid changes regardless of absolute values:
-- Detect rapid rate of change
WITH RateOfChange AS (
SELECT
deviceId,
location,
timestamp,
temperature,
LAG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 5)) AS prevTemp,
LAG(timestamp) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 5)) AS prevTimestamp
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
prevTemp,
temperature - prevTemp AS absoluteChange,
DATEDIFF(second, prevTimestamp, timestamp) AS timeDiffSeconds,
-- Calculate rate (degrees per minute)
CASE
WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
THEN ((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0))
ELSE 0
END AS changeRatePerMinute,
-- Classify change rate
CASE
WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 10
THEN 'Extreme Rate Change'
WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 5
THEN 'High Rate Change'
WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 2
THEN 'Moderate Rate Change'
ELSE 'Normal Rate'
END AS rateCategory
INTO
SqlOutput
FROM
RateOfChange
WHERE
prevTemp IS NOT NULL
AND DATEDIFF(second, prevTimestamp, timestamp) > 0
AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 5;
🎯 Step 4: Multi-Layer Anomaly Detection¶
4.1 Composite Anomaly Detection¶
Combine multiple detection methods:
-- Multi-layer anomaly detection combining ML and custom logic
WITH MLDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS mlTempAnomaly,
CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS mlVibAnomaly
FROM
EventHubInput TIMESTAMP BY timestamp
),
CustomDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS avgTemp,
STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS stdDevTemp,
CASE
WHEN STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) > 0
AND ABS((temperature - AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1))) /
STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1))) > 2
THEN 1 ELSE 0
END AS customTempAnomaly,
CASE
WHEN vibration > 1.5 THEN 1 ELSE 0
END AS customVibAnomaly
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
m.deviceId,
m.location,
m.timestamp,
m.temperature,
m.vibration,
-- ML detection results
m.mlTempAnomaly.IsAnomaly AS mlTempDetection,
m.mlTempAnomaly.Score AS mlTempScore,
m.mlVibAnomaly.IsAnomaly AS mlVibDetection,
m.mlVibAnomaly.Score AS mlVibScore,
-- Custom detection results
c.customTempAnomaly,
c.customVibAnomaly,
-- Composite anomaly score (0-4 scale)
(m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) AS compositeScore,
-- Final classification
CASE
WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) >= 3
THEN 'Critical - Multiple Detection Methods'
WHEN (m.mlTempAnomaly.IsAnomaly + c.customTempAnomaly) >= 2
THEN 'High - Temperature Anomaly Confirmed'
WHEN (m.mlVibAnomaly.IsAnomaly + c.customVibAnomaly) >= 2
THEN 'High - Vibration Anomaly Confirmed'
WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) >= 2
THEN 'Medium - Multiple Indicators'
WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) = 1
THEN 'Low - Single Detection Method'
ELSE 'Normal'
END AS severityLevel,
System.Timestamp() AS detectionTime
INTO
SqlOutput
FROM
MLDetection m
JOIN
CustomDetection c
ON
m.deviceId = c.deviceId
AND m.timestamp = c.timestamp
WHERE
(m.mlTempAnomaly.IsAnomaly = 1 OR m.mlVibAnomaly.IsAnomaly = 1 OR c.customTempAnomaly = 1 OR c.customVibAnomaly = 1);
4.2 Context-Aware Anomaly Detection¶
Adjust thresholds based on context:
-- Context-aware anomaly detection (time of day, location, device type)
WITH ContextualBaseline AS (
SELECT
deviceId,
location,
timestamp,
temperature,
DATEPART(hour, timestamp) AS hourOfDay,
DATEPART(weekday, timestamp) AS dayOfWeek,
-- Different baselines for different times
CASE
WHEN DATEPART(hour, timestamp) BETWEEN 8 AND 17 THEN AVG(temperature)
OVER(PARTITION BY deviceId, DATEPART(hour, timestamp) LIMIT DURATION(day, 7))
ELSE AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(day, 7))
END AS contextualAvg,
CASE
WHEN DATEPART(hour, timestamp) BETWEEN 8 AND 17 THEN STDEV(temperature)
OVER(PARTITION BY deviceId, DATEPART(hour, timestamp) LIMIT DURATION(day, 7))
ELSE STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(day, 7))
END AS contextualStdDev
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
hourOfDay,
contextualAvg AS expectedTemperature,
contextualStdDev AS normalVariance,
ABS(temperature - contextualAvg) AS deviation,
CASE
WHEN contextualStdDev > 0 AND ABS(temperature - contextualAvg) / contextualStdDev > 3
THEN 'Critical Anomaly'
WHEN contextualStdDev > 0 AND ABS(temperature - contextualAvg) / contextualStdDev > 2
THEN 'Moderate Anomaly'
ELSE 'Normal for Time Period'
END AS contextualStatus
INTO
BlobOutput
FROM
ContextualBaseline
WHERE
contextualStdDev > 0
AND ABS(temperature - contextualAvg) / contextualStdDev > 2;
✅ Testing and Validation¶
Deploy Anomaly Detection Query¶
$anomalyQuery = @"
-- Comprehensive anomaly detection
WITH AnomalyDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS tempAnomaly,
CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS vibAnomaly
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
tempAnomaly.IsAnomaly AS isTempAnomaly,
tempAnomaly.Score AS tempAnomalyScore,
vibAnomaly.IsAnomaly AS isVibAnomaly,
vibAnomaly.Score AS vibAnomalyScore,
CASE
WHEN tempAnomaly.IsAnomaly = 1 AND vibAnomaly.IsAnomaly = 1 THEN 'Critical'
WHEN tempAnomaly.IsAnomaly = 1 OR vibAnomaly.IsAnomaly = 1 THEN 'Warning'
ELSE 'Normal'
END AS alertLevel
INTO
SqlOutput
FROM
AnomalyDetection
WHERE
tempAnomaly.IsAnomaly = 1 OR vibAnomaly.IsAnomaly = 1;
"@
$anomalyQuery | Out-File -FilePath "anomaly-detection-query.sql" -Encoding UTF8
# Update job
az stream-analytics job stop --name $env:STREAM_JOB --resource-group $env:STREAM_RG
az stream-analytics transformation update --job-name $env:STREAM_JOB --resource-group $env:STREAM_RG --name "Transformation" --saql @anomaly-detection-query.sql
az stream-analytics job start --job-name $env:STREAM_JOB --resource-group $env:STREAM_RG --output-start-mode JobStartTime
🎓 Key Concepts Learned¶
Anomaly Detection Methods¶
| Method | Best For | Sensitivity |
|---|---|---|
| SpikeAndDip | Sudden value changes | High |
| ChangePoint | Trend shifts | Medium |
| Z-Score | Statistical outliers | Configurable |
| Percentile | Distribution-based detection | Medium |
| Rate of Change | Rapid transitions | High |
Best Practices¶
- Combine multiple detection methods for accuracy
- Tune confidence levels based on false positive tolerance
- Use appropriate history window sizes (100-200 events)
- Partition by deviceId for device-specific baselines
- Filter output to anomalies only to reduce noise
🚀 Next Steps¶
You've mastered anomaly detection! Continue to:
Tutorial 08: Power BI Integration →
📚 Additional Resources¶
Tutorial Progress: 7 of 11 complete | Next: Power BI Integration
Last Updated: January 2025