Skip to content

🚨 Tutorial 7: Anomaly Detection

Tutorial Duration Level

Implement machine learning-powered anomaly detection using Stream Analytics built-in functions. Detect spikes, dips, change points, and custom anomalies in real-time streaming data.

🎯 Learning Objectives

After completing this tutorial, you will be able to:

  • Use AnomalyDetection_SpikeAndDip for sudden value changes
  • Implement AnomalyDetection_ChangePoint for trend shifts
  • Configure confidence levels and sensitivity parameters
  • Build custom anomaly detection logic with statistical methods
  • Create multi-layered detection strategies
  • Generate actionable alerts from anomalies

⏱️ Time Estimate: 35 minutes

  • Spike and Dip Detection: 12 minutes
  • Change Point Detection: 12 minutes
  • Custom Anomaly Logic: 11 minutes

📋 Prerequisites

📈 Step 1: Spike and Dip Detection

1.1 Basic Spike Detection

Detect sudden increases in metric values:

-- Detect temperature spikes using built-in ML function
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikes') OVER(LIMIT DURATION(minute, 10)) AS RECORD) AS spikeScore
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

Parameters Explained: - 95: Confidence level (95% confidence) - 120: History window size (120 events) - 'spikes': Detect only spikes (can be 'dips' or 'spikesanddips')

1.2 Extract Anomaly Details

Parse the anomaly detection output:

-- Extract spike detection details
WITH SpikeDetection AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikes')
            OVER(LIMIT DURATION(minute, 10)) AS RECORD) AS spikeResult
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    spikeResult.IsAnomaly AS isSpike,
    spikeResult.Score AS anomalyScore,
    spikeResult.PValue AS pValue,
    CASE
        WHEN spikeResult.IsAnomaly = 1 AND spikeResult.Score > 0.8 THEN 'Critical Spike'
        WHEN spikeResult.IsAnomaly = 1 AND spikeResult.Score > 0.5 THEN 'Warning Spike'
        WHEN spikeResult.IsAnomaly = 1 THEN 'Minor Spike'
        ELSE 'Normal'
    END AS severityLevel
INTO
    SqlOutput
FROM
    SpikeDetection
WHERE
    spikeResult.IsAnomaly = 1;  -- Only output anomalies

1.3 Detect Both Spikes and Dips

Monitor for bidirectional anomalies:

-- Detect temperature spikes AND dips
WITH BidirectionalDetection AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        humidity,
        vibration,
        CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS tempAnomaly,
        CAST(AnomalyDetection_SpikeAndDip(humidity, 90, 100, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS humidityAnomaly,
        CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS vibrationAnomaly
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    humidity,
    vibration,
    -- Temperature anomaly
    tempAnomaly.IsAnomaly AS isTempAnomaly,
    tempAnomaly.Score AS tempAnomalyScore,
    -- Humidity anomaly
    humidityAnomaly.IsAnomaly AS isHumidityAnomaly,
    humidityAnomaly.Score AS humidityAnomalyScore,
    -- Vibration anomaly
    vibrationAnomaly.IsAnomaly AS isVibrationAnomaly,
    vibrationAnomaly.Score AS vibrationAnomalyScore,
    -- Composite severity
    CASE
        WHEN (tempAnomaly.IsAnomaly + humidityAnomaly.IsAnomaly + vibrationAnomaly.IsAnomaly) >= 2 THEN 'Critical - Multiple Metrics'
        WHEN tempAnomaly.IsAnomaly = 1 OR vibrationAnomaly.IsAnomaly = 1 THEN 'High - Safety Concern'
        WHEN humidityAnomaly.IsAnomaly = 1 THEN 'Medium - Environmental'
        ELSE 'Normal'
    END AS overallSeverity,
    -- Alert message
    CONCAT(
        CASE WHEN tempAnomaly.IsAnomaly = 1 THEN 'Temperature ' ELSE '' END,
        CASE WHEN humidityAnomaly.IsAnomaly = 1 THEN 'Humidity ' ELSE '' END,
        CASE WHEN vibrationAnomaly.IsAnomaly = 1 THEN 'Vibration ' ELSE '' END,
        'Anomaly Detected'
    ) AS alertMessage
INTO
    BlobOutput
FROM
    BidirectionalDetection
WHERE
    tempAnomaly.IsAnomaly = 1
    OR humidityAnomaly.IsAnomaly = 1
    OR vibrationAnomaly.IsAnomaly = 1;

📉 Step 2: Change Point Detection

2.1 Basic Change Point Detection

Identify when data patterns fundamentally shift:

-- Detect change points in temperature trends
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
        OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS changePoint
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

Parameters: - 80: Confidence level (80%) - 120: History window size

2.2 Extract Change Point Details

Get detailed change point information:

-- Detailed change point analysis
WITH ChangePointAnalysis AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 2)) AS RECORD) AS changePointResult
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    changePointResult.IsAnomaly AS isChangePoint,
    changePointResult.Score AS confidenceScore,
    changePointResult.PValue AS statisticalSignificance,
    CASE
        WHEN changePointResult.IsAnomaly = 1 AND changePointResult.Score > 0.9 THEN 'Strong Trend Change'
        WHEN changePointResult.IsAnomaly = 1 AND changePointResult.Score > 0.7 THEN 'Moderate Trend Change'
        WHEN changePointResult.IsAnomaly = 1 THEN 'Weak Trend Change'
        ELSE 'Stable Trend'
    END AS trendStatus,
    -- Calculate temperature trend
    temperature - LAG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 30)) AS recentTempChange,
    -- Alert type
    CASE
        WHEN changePointResult.IsAnomaly = 1 THEN 'Trend Change Alert'
        ELSE NULL
    END AS alertType
INTO
    SqlOutput
FROM
    ChangePointAnalysis
WHERE
    changePointResult.IsAnomaly = 1;

2.3 Multi-Metric Change Point Detection

Monitor trend changes across multiple metrics:

-- Monitor change points in temperature, humidity, and vibration
WITH MultiMetricChangePoints AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        humidity,
        vibration,
        CAST(AnomalyDetection_ChangePoint(temperature, 80, 120)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS tempCP,
        CAST(AnomalyDetection_ChangePoint(humidity, 75, 120)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS humidityCP,
        CAST(AnomalyDetection_ChangePoint(vibration, 85, 120)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS RECORD) AS vibrationCP
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    -- Change point indicators
    tempCP.IsAnomaly AS tempTrendChange,
    humidityCP.IsAnomaly AS humidityTrendChange,
    vibrationCP.IsAnomaly AS vibrationTrendChange,
    -- Confidence scores
    tempCP.Score AS tempConfidence,
    humidityCP.Score AS humidityConfidence,
    vibrationCP.Score AS vibrationConfidence,
    -- Combined analysis
    CASE
        WHEN tempCP.IsAnomaly = 1 AND vibrationCP.IsAnomaly = 1 THEN 'Equipment State Change'
        WHEN tempCP.IsAnomaly = 1 AND humidityCP.IsAnomaly = 1 THEN 'Environmental Shift'
        WHEN vibrationCP.IsAnomaly = 1 THEN 'Mechanical Change'
        WHEN tempCP.IsAnomaly = 1 THEN 'Thermal Trend Shift'
        WHEN humidityCP.IsAnomaly = 1 THEN 'Humidity Pattern Change'
        ELSE 'Unknown Change'
    END AS changeCategory,
    System.Timestamp() AS detectionTime
INTO
    BlobOutput
FROM
    MultiMetricChangePoints
WHERE
    tempCP.IsAnomaly = 1
    OR humidityCP.IsAnomaly = 1
    OR vibrationCP.IsAnomaly = 1;

🔍 Step 3: Custom Anomaly Detection

3.1 Statistical Threshold Detection

Implement Z-score based anomaly detection:

-- Custom Z-score anomaly detection
WITH StatisticalBaseline AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS avgTemp,
        STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS stdDevTemp
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    avgTemp AS baselineAverage,
    stdDevTemp AS baselineStdDev,
    -- Calculate Z-score
    CASE
        WHEN stdDevTemp > 0 THEN (temperature - avgTemp) / stdDevTemp
        ELSE 0
    END AS zScore,
    -- Classify anomaly
    CASE
        WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 3 THEN 'Extreme Anomaly'
        WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 2 THEN 'Moderate Anomaly'
        WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 1.5 THEN 'Minor Deviation'
        ELSE 'Normal'
    END AS anomalyClassification,
    -- Anomaly flag
    CASE
        WHEN stdDevTemp > 0 AND ABS((temperature - avgTemp) / stdDevTemp) > 2 THEN 1
        ELSE 0
    END AS isAnomaly
INTO
    SqlOutput
FROM
    StatisticalBaseline
WHERE
    stdDevTemp > 0
    AND ABS((temperature - avgTemp) / stdDevTemp) > 2;

3.2 Percentile-Based Detection

Use percentiles to identify outliers:

-- Percentile-based anomaly detection
WITH PercentileAnalysis AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        PERCENTILE_CONT(0.05) WITHIN GROUP (ORDER BY temperature)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS p05,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY temperature)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS p95,
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY temperature)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS median
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    median AS medianTemperature,
    p05 AS lowerBound,
    p95 AS upperBound,
    CASE
        WHEN temperature > p95 THEN 'High Outlier'
        WHEN temperature < p05 THEN 'Low Outlier'
        ELSE 'Within Range'
    END AS outlierStatus,
    CASE
        WHEN temperature > p95 THEN temperature - p95
        WHEN temperature < p05 THEN p05 - temperature
        ELSE 0
    END AS deviationFromRange
INTO
    BlobOutput
FROM
    PercentileAnalysis
WHERE
    temperature > p95 OR temperature < p05;

3.3 Rate of Change Detection

Detect rapid changes regardless of absolute values:

-- Detect rapid rate of change
WITH RateOfChange AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        LAG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 5)) AS prevTemp,
        LAG(timestamp) OVER(PARTITION BY deviceId LIMIT DURATION(minute, 5)) AS prevTimestamp
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    prevTemp,
    temperature - prevTemp AS absoluteChange,
    DATEDIFF(second, prevTimestamp, timestamp) AS timeDiffSeconds,
    -- Calculate rate (degrees per minute)
    CASE
        WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
        THEN ((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0))
        ELSE 0
    END AS changeRatePerMinute,
    -- Classify change rate
    CASE
        WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
             AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 10
        THEN 'Extreme Rate Change'
        WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
             AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 5
        THEN 'High Rate Change'
        WHEN prevTemp IS NOT NULL AND DATEDIFF(second, prevTimestamp, timestamp) > 0
             AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 2
        THEN 'Moderate Rate Change'
        ELSE 'Normal Rate'
    END AS rateCategory
INTO
    SqlOutput
FROM
    RateOfChange
WHERE
    prevTemp IS NOT NULL
    AND DATEDIFF(second, prevTimestamp, timestamp) > 0
    AND ABS((temperature - prevTemp) / (DATEDIFF(second, prevTimestamp, timestamp) / 60.0)) > 5;

🎯 Step 4: Multi-Layer Anomaly Detection

4.1 Composite Anomaly Detection

Combine multiple detection methods:

-- Multi-layer anomaly detection combining ML and custom logic
WITH MLDetection AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        vibration,
        CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS mlTempAnomaly,
        CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS mlVibAnomaly
    FROM
        EventHubInput TIMESTAMP BY timestamp
),
CustomDetection AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        vibration,
        AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS avgTemp,
        STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS stdDevTemp,
        CASE
            WHEN STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) > 0
                 AND ABS((temperature - AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1))) /
                         STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1))) > 2
            THEN 1 ELSE 0
        END AS customTempAnomaly,
        CASE
            WHEN vibration > 1.5 THEN 1 ELSE 0
        END AS customVibAnomaly
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    m.deviceId,
    m.location,
    m.timestamp,
    m.temperature,
    m.vibration,
    -- ML detection results
    m.mlTempAnomaly.IsAnomaly AS mlTempDetection,
    m.mlTempAnomaly.Score AS mlTempScore,
    m.mlVibAnomaly.IsAnomaly AS mlVibDetection,
    m.mlVibAnomaly.Score AS mlVibScore,
    -- Custom detection results
    c.customTempAnomaly,
    c.customVibAnomaly,
    -- Composite anomaly score (0-4 scale)
    (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) AS compositeScore,
    -- Final classification
    CASE
        WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) >= 3
            THEN 'Critical - Multiple Detection Methods'
        WHEN (m.mlTempAnomaly.IsAnomaly + c.customTempAnomaly) >= 2
            THEN 'High - Temperature Anomaly Confirmed'
        WHEN (m.mlVibAnomaly.IsAnomaly + c.customVibAnomaly) >= 2
            THEN 'High - Vibration Anomaly Confirmed'
        WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) >= 2
            THEN 'Medium - Multiple Indicators'
        WHEN (m.mlTempAnomaly.IsAnomaly + m.mlVibAnomaly.IsAnomaly + c.customTempAnomaly + c.customVibAnomaly) = 1
            THEN 'Low - Single Detection Method'
        ELSE 'Normal'
    END AS severityLevel,
    System.Timestamp() AS detectionTime
INTO
    SqlOutput
FROM
    MLDetection m
JOIN
    CustomDetection c
ON
    m.deviceId = c.deviceId
    AND m.timestamp = c.timestamp
WHERE
    (m.mlTempAnomaly.IsAnomaly = 1 OR m.mlVibAnomaly.IsAnomaly = 1 OR c.customTempAnomaly = 1 OR c.customVibAnomaly = 1);

4.2 Context-Aware Anomaly Detection

Adjust thresholds based on context:

-- Context-aware anomaly detection (time of day, location, device type)
WITH ContextualBaseline AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        DATEPART(hour, timestamp) AS hourOfDay,
        DATEPART(weekday, timestamp) AS dayOfWeek,
        -- Different baselines for different times
        CASE
            WHEN DATEPART(hour, timestamp) BETWEEN 8 AND 17 THEN AVG(temperature)
                OVER(PARTITION BY deviceId, DATEPART(hour, timestamp) LIMIT DURATION(day, 7))
            ELSE AVG(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(day, 7))
        END AS contextualAvg,
        CASE
            WHEN DATEPART(hour, timestamp) BETWEEN 8 AND 17 THEN STDEV(temperature)
                OVER(PARTITION BY deviceId, DATEPART(hour, timestamp) LIMIT DURATION(day, 7))
            ELSE STDEV(temperature) OVER(PARTITION BY deviceId LIMIT DURATION(day, 7))
        END AS contextualStdDev
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    hourOfDay,
    contextualAvg AS expectedTemperature,
    contextualStdDev AS normalVariance,
    ABS(temperature - contextualAvg) AS deviation,
    CASE
        WHEN contextualStdDev > 0 AND ABS(temperature - contextualAvg) / contextualStdDev > 3
            THEN 'Critical Anomaly'
        WHEN contextualStdDev > 0 AND ABS(temperature - contextualAvg) / contextualStdDev > 2
            THEN 'Moderate Anomaly'
        ELSE 'Normal for Time Period'
    END AS contextualStatus
INTO
    BlobOutput
FROM
    ContextualBaseline
WHERE
    contextualStdDev > 0
    AND ABS(temperature - contextualAvg) / contextualStdDev > 2;

✅ Testing and Validation

Deploy Anomaly Detection Query

$anomalyQuery = @"
-- Comprehensive anomaly detection
WITH AnomalyDetection AS (
    SELECT
        deviceId,
        location,
        timestamp,
        temperature,
        vibration,
        CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS tempAnomaly,
        CAST(AnomalyDetection_SpikeAndDip(vibration, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS vibAnomaly
    FROM
        EventHubInput TIMESTAMP BY timestamp
)
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    vibration,
    tempAnomaly.IsAnomaly AS isTempAnomaly,
    tempAnomaly.Score AS tempAnomalyScore,
    vibAnomaly.IsAnomaly AS isVibAnomaly,
    vibAnomaly.Score AS vibAnomalyScore,
    CASE
        WHEN tempAnomaly.IsAnomaly = 1 AND vibAnomaly.IsAnomaly = 1 THEN 'Critical'
        WHEN tempAnomaly.IsAnomaly = 1 OR vibAnomaly.IsAnomaly = 1 THEN 'Warning'
        ELSE 'Normal'
    END AS alertLevel
INTO
    SqlOutput
FROM
    AnomalyDetection
WHERE
    tempAnomaly.IsAnomaly = 1 OR vibAnomaly.IsAnomaly = 1;
"@

$anomalyQuery | Out-File -FilePath "anomaly-detection-query.sql" -Encoding UTF8

# Update job
az stream-analytics job stop --name $env:STREAM_JOB --resource-group $env:STREAM_RG
az stream-analytics transformation update --job-name $env:STREAM_JOB --resource-group $env:STREAM_RG --name "Transformation" --saql @anomaly-detection-query.sql
az stream-analytics job start --job-name $env:STREAM_JOB --resource-group $env:STREAM_RG --output-start-mode JobStartTime

🎓 Key Concepts Learned

Anomaly Detection Methods

Method Best For Sensitivity
SpikeAndDip Sudden value changes High
ChangePoint Trend shifts Medium
Z-Score Statistical outliers Configurable
Percentile Distribution-based detection Medium
Rate of Change Rapid transitions High

Best Practices

  • Combine multiple detection methods for accuracy
  • Tune confidence levels based on false positive tolerance
  • Use appropriate history window sizes (100-200 events)
  • Partition by deviceId for device-specific baselines
  • Filter output to anomalies only to reduce noise

🚀 Next Steps

You've mastered anomaly detection! Continue to:

Tutorial 08: Power BI Integration →

📚 Additional Resources


Tutorial Progress: 7 of 11 complete | Next: Power BI Integration

Last Updated: January 2025