Skip to content

=

Anomaly Detection

< Home | = Overview | = Services | = Streaming Services | __ Stream Analytics__ | = Anomaly Detection

Complexity Type

Built-in machine learning capabilities for detecting anomalies in streaming data.


< Overview

Azure Stream Analytics includes built-in machine learning models for anomaly detection without requiring data science expertise. These functions detect unusual patterns in streaming data in real-time.

Supported Anomaly Types

  • Spikes: Sudden increase in values
  • Dips: Sudden decrease in values
  • Slow Positive Trends: Gradual upward drift
  • Slow Negative Trends: Gradual downward drift
  • Bidirectional Level Changes: Step changes up or down
graph TB
    subgraph "Normal Pattern"
        Normal[Steady<br/>Baseline]
    end

    subgraph "Anomalies"
        Spike[Spike<br/>]
        Dip[Dip<br/>]
        TrendUp[Slow Positive<br/>Trend ]
        TrendDown[Slow Negative<br/>Trend ]
        LevelChange[Level<br/>Change]
    end

    Normal --> Spike
    Normal --> Dip
    Normal --> TrendUp
    Normal --> TrendDown
    Normal --> LevelChange

=' Anomaly Detection Functions

AnomalyDetection_SpikeAndDip

Detects spikes and dips in time-series data.

-- Basic spike and dip detection
SELECT
    deviceId,
    temperature,
    AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
        OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS anomalyScore
INTO
    [anomaly-output]
FROM
    [sensor-input]

Parameters: - Column: The numeric column to analyze (e.g., temperature) - Confidence: Confidence level (0-100), typically 95 - History Size: Number of events to consider (e.g., 120) - Mode: 'spikes', 'dips', or 'spikesanddips'

Output Fields: - Score: Anomaly score (0-1, higher = more anomalous) - IsAnomaly: Boolean flag (1 = anomaly detected)

AnomalyDetection_ChangePoint

Detects persistent changes in the level or trend of time-series data.

-- Detect level changes in metrics
SELECT
    metricName,
    value,
    AnomalyDetection_ChangePoint(value, 95, 120)
        OVER(PARTITION BY metricName LIMIT DURATION(hour, 1)) AS changePoint
INTO
    [changepoint-output]
FROM
    [metrics-input]

Use Cases: - Detecting system configuration changes - Identifying service degradation - Monitoring baseline shifts


= Practical Examples

Example 1: Temperature Anomaly Detection

-- Comprehensive temperature monitoring with anomaly detection
WITH TemperatureData AS (
    SELECT
        deviceId,
        location,
        temperature,
        System.Timestamp() AS eventTime
    FROM
        [sensor-input]
),
AnomalyDetection AS (
    SELECT
        deviceId,
        location,
        temperature,
        eventTime,
        AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 2)) AS spikeAndDip,
        AnomalyDetection_ChangePoint(temperature, 95, 120)
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 2)) AS changePoint
    FROM
        TemperatureData
)

-- Output anomalies with context
SELECT
    deviceId,
    location,
    temperature,
    eventTime,
    spikeAndDip.Score AS spikeScore,
    spikeAndDip.IsAnomaly AS isSpikeAnomaly,
    changePoint.Score AS changePointScore,
    changePoint.IsAnomaly AS isChangePoint,
    CASE
        WHEN spikeAndDip.IsAnomaly = 1 AND spikeAndDip.Score > 0.8 THEN 'Critical Spike/Dip'
        WHEN changePoint.IsAnomaly = 1 THEN 'Level Change Detected'
        ELSE 'Normal'
    END AS anomalyType
INTO
    [temperature-anomalies]
FROM
    AnomalyDetection
WHERE
    spikeAndDip.IsAnomaly = 1 OR changePoint.IsAnomaly = 1
// C# consumer to process anomalies
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text.Json;

public class AnomalyProcessor
{
    public async Task ProcessAnomaliesAsync(string connectionString, string eventHubName)
    {
        await using var consumer = new EventHubConsumerClient(
            EventHubConsumerClient.DefaultConsumerGroupName,
            connectionString,
            eventHubName);

        await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync())
        {
            var anomaly = JsonSerializer.Deserialize<TemperatureAnomaly>(
                partitionEvent.Data.EventBody.ToString());

            Console.WriteLine($"Anomaly detected for device {anomaly.DeviceId}");
            Console.WriteLine($"  Type: {anomaly.AnomalyType}");
            Console.WriteLine($"  Temperature: {anomaly.Temperature}C");
            Console.WriteLine($"  Spike Score: {anomaly.SpikeScore:F2}");

            if (anomaly.AnomalyType == "Critical Spike/Dip")
            {
                await SendAlertAsync(anomaly);
            }
        }
    }

    private async Task SendAlertAsync(TemperatureAnomaly anomaly)
    {
        // Send alert via email, SMS, or notification service
        Console.WriteLine($"= ALERT: Critical anomaly for device {anomaly.DeviceId}");
    }
}

public record TemperatureAnomaly(
    string DeviceId,
    string Location,
    double Temperature,
    DateTime EventTime,
    double SpikeScore,
    bool IsSpikeAnomaly,
    double ChangePointScore,
    bool IsChangePoint,
    string AnomalyType
);

Example 2: Network Traffic Anomaly Detection

-- Detect unusual network traffic patterns
WITH NetworkMetrics AS (
    SELECT
        serverId,
        bytesReceived,
        bytesSent,
        connectionCount,
        System.Timestamp() AS timestamp
    FROM
        [network-input]
    GROUP BY
        serverId,
        bytesReceived,
        bytesSent,
        connectionCount,
        TumblingWindow(second, 30)
),
TrafficAnomalies AS (
    SELECT
        serverId,
        bytesReceived,
        bytesSent,
        connectionCount,
        timestamp,
        AnomalyDetection_SpikeAndDip(bytesReceived, 95, 240, 'spikes')
            OVER(PARTITION BY serverId LIMIT DURATION(minute, 30)) AS receivedAnomaly,
        AnomalyDetection_SpikeAndDip(bytesSent, 95, 240, 'spikes')
            OVER(PARTITION BY serverId LIMIT DURATION(minute, 30)) AS sentAnomaly,
        AnomalyDetection_SpikeAndDip(connectionCount, 95, 240, 'spikes')
            OVER(PARTITION BY serverId LIMIT DURATION(minute, 30)) AS connAnomaly
    FROM
        NetworkMetrics
)

-- Flag potential DDoS or security incidents
SELECT
    serverId,
    bytesReceived / 1024 / 1024 AS mbReceived,
    bytesSent / 1024 / 1024 AS mbSent,
    connectionCount,
    timestamp,
    receivedAnomaly.Score AS receivedAnomalyScore,
    sentAnomaly.Score AS sentAnomalyScore,
    connAnomaly.Score AS connAnomalyScore,
    CASE
        WHEN receivedAnomaly.IsAnomaly = 1 AND connAnomaly.IsAnomaly = 1
            THEN 'Potential DDoS Attack'
        WHEN sentAnomaly.IsAnomaly = 1 AND sentAnomaly.Score > 0.9
            THEN 'Potential Data Exfiltration'
        WHEN receivedAnomaly.IsAnomaly = 1
            THEN 'Unusual Inbound Traffic'
        ELSE 'Traffic Anomaly'
    END AS securityAlert
INTO
    [security-alerts]
FROM
    TrafficAnomalies
WHERE
    receivedAnomaly.IsAnomaly = 1
    OR sentAnomaly.IsAnomaly = 1
    OR connAnomaly.IsAnomaly = 1

Example 3: Application Performance Monitoring

-- Monitor application response times for anomalies
WITH ResponseTimeMetrics AS (
    SELECT
        applicationName,
        endpoint,
        AVG(responseTime) AS avgResponseTime,
        MAX(responseTime) AS maxResponseTime,
        COUNT(*) AS requestCount,
        System.Timestamp() AS windowEnd
    FROM
        [app-logs]
    GROUP BY
        applicationName,
        endpoint,
        TumblingWindow(minute, 1)
),
PerformanceAnomalies AS (
    SELECT
        applicationName,
        endpoint,
        avgResponseTime,
        maxResponseTime,
        requestCount,
        windowEnd,
        AnomalyDetection_SpikeAndDip(avgResponseTime, 95, 120, 'spikes')
            OVER(PARTITION BY applicationName, endpoint LIMIT DURATION(hour, 1)) AS responseAnomaly,
        AnomalyDetection_ChangePoint(avgResponseTime, 95, 120)
            OVER(PARTITION BY applicationName, endpoint LIMIT DURATION(hour, 1)) AS performanceChange
    FROM
        ResponseTimeMetrics
)

-- Alert on performance degradation
SELECT
    applicationName,
    endpoint,
    avgResponseTime,
    maxResponseTime,
    requestCount,
    windowEnd,
    responseAnomaly.Score AS anomalyScore,
    performanceChange.Score AS changeScore,
    CASE
        WHEN performanceChange.IsAnomaly = 1 THEN 'Performance Degradation Detected'
        WHEN responseAnomaly.IsAnomaly = 1 AND responseAnomaly.Score > 0.9 THEN 'Critical Response Time Spike'
        WHEN responseAnomaly.IsAnomaly = 1 THEN 'Response Time Anomaly'
        ELSE 'Normal'
    END AS alertType
INTO
    [performance-alerts]
FROM
    PerformanceAnomalies
WHERE
    responseAnomaly.IsAnomaly = 1 OR performanceChange.IsAnomaly = 1

< Tuning Anomaly Detection

Confidence Level Selection

# Python script to evaluate different confidence levels
import numpy as np
from typing import List, Tuple

def evaluate_confidence_levels(
    normal_data: List[float],
    anomaly_data: List[float]
) -> List[Tuple[int, float, float]]:
    """
    Evaluate detection performance at different confidence levels.

    Returns: List of (confidence_level, false_positive_rate, true_positive_rate)
    """
    confidence_levels = [80, 85, 90, 95, 99]
    results = []

    for confidence in confidence_levels:
        # Simulate anomaly detection threshold
        threshold = np.percentile(normal_data, confidence)

        # Calculate metrics
        false_positives = sum(1 for x in normal_data if x > threshold)
        true_positives = sum(1 for x in anomaly_data if x > threshold)

        fpr = false_positives / len(normal_data)
        tpr = true_positives / len(anomaly_data)

        results.append((confidence, fpr, tpr))

        print(f"Confidence {confidence}%:")
        print(f"  False Positive Rate: {fpr:.2%}")
        print(f"  True Positive Rate: {tpr:.2%}")
        print()

    return results

# Example usage
normal_temps = [20 + np.random.normal(0, 2) for _ in range(1000)]
anomaly_temps = [20 + np.random.normal(10, 3) for _ in range(100)]

evaluate_confidence_levels(normal_temps, anomaly_temps)

Confidence Level Guidelines: - 95%: Good balance, recommended for most use cases - 99%: More conservative, fewer false positives - 90%: More sensitive, catches more anomalies but more false positives

History Size Selection

-- Test different history sizes
-- Small history (60): Fast adaptation, more sensitive to recent changes
SELECT
    deviceId,
    temperature,
    AnomalyDetection_SpikeAndDip(temperature, 95, 60, 'spikesanddips')
        OVER(PARTITION BY deviceId LIMIT DURATION(minute, 30)) AS anomaly60
FROM [input]

-- Medium history (120): Balanced approach (recommended)
SELECT
    deviceId,
    temperature,
    AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
        OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS anomaly120
FROM [input]

-- Large history (240): More stable, less reactive to short-term fluctuations
SELECT
    deviceId,
    temperature,
    AnomalyDetection_SpikeAndDip(temperature, 95, 240, 'spikesanddips')
        OVER(PARTITION BY deviceId LIMIT DURATION(hour, 2)) AS anomaly240
FROM [input]

= Integration with Alerting

Event Grid Integration

-- Send anomalies to Event Grid for action
SELECT
    deviceId,
    temperature,
    anomaly.Score AS score,
    'TemperatureAnomaly' AS eventType,
    System.Timestamp() AS detectionTime
INTO
    [eventgrid-output]
FROM
    [sensor-input]
CROSS APPLY
    AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
        OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS anomaly
WHERE
    anomaly.IsAnomaly = 1
# Python Azure Function triggered by Event Grid
import logging
import json
import azure.functions as func
from azure.communication.email import EmailClient

def main(event: func.EventGridEvent):
    """Process anomaly alert from Stream Analytics."""
    result = event.get_json()

    logging.info(f"Anomaly detected for device {result['deviceId']}")
    logging.info(f"  Temperature: {result['temperature']}")
    logging.info(f"  Anomaly Score: {result['score']}")

    # Send email alert
    if result['score'] > 0.9:
        send_critical_alert(result)

    # Update monitoring dashboard
    update_dashboard(result)

    # Log to monitoring system
    log_to_monitoring(result)

def send_critical_alert(anomaly_data):
    """Send critical alert via email."""
    email_client = EmailClient.from_connection_string(
        os.environ["COMMUNICATION_SERVICES_CONNECTION_STRING"]
    )

    message = {
        "senderAddress": "noreply@contoso.com",
        "recipients": {
            "to": [{"address": "alerts@contoso.com"}]
        },
        "content": {
            "subject": f"Critical Anomaly: Device {anomaly_data['deviceId']}",
            "plainText": f"""
            Critical anomaly detected:

            Device: {anomaly_data['deviceId']}
            Temperature: {anomaly_data['temperature']}C
            Anomaly Score: {anomaly_data['score']:.2f}
            Time: {anomaly_data['detectionTime']}
            """
        }
    }

    email_client.begin_send(message)

= Advanced Patterns

Multi-Metric Anomaly Detection

-- Combine multiple metrics for comprehensive anomaly detection
WITH MultiMetricAnomalies AS (
    SELECT
        deviceId,
        temperature,
        humidity,
        pressure,
        AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS tempAnomaly,
        AnomalyDetection_SpikeAndDip(humidity, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS humidityAnomaly,
        AnomalyDetection_SpikeAndDip(pressure, 95, 120, 'spikesanddips')
            OVER(PARTITION BY deviceId LIMIT DURATION(hour, 1)) AS pressureAnomaly,
        System.Timestamp() AS timestamp
    FROM
        [sensor-input]
)

-- Flag devices with multiple concurrent anomalies
SELECT
    deviceId,
    temperature,
    humidity,
    pressure,
    timestamp,
    (CASE WHEN tempAnomaly.IsAnomaly = 1 THEN 1 ELSE 0 END +
     CASE WHEN humidityAnomaly.IsAnomaly = 1 THEN 1 ELSE 0 END +
     CASE WHEN pressureAnomaly.IsAnomaly = 1 THEN 1 ELSE 0 END) AS anomalyCount,
    CASE
        WHEN tempAnomaly.IsAnomaly = 1 AND humidityAnomaly.IsAnomaly = 1
            AND pressureAnomaly.IsAnomaly = 1
            THEN 'System Failure Likely'
        WHEN (tempAnomaly.IsAnomaly = 1 AND humidityAnomaly.IsAnomaly = 1)
            OR (tempAnomaly.IsAnomaly = 1 AND pressureAnomaly.IsAnomaly = 1)
            THEN 'Multiple Anomalies Detected'
        ELSE 'Single Anomaly'
    END AS severity
INTO
    [multi-metric-alerts]
FROM
    MultiMetricAnomalies
WHERE
    tempAnomaly.IsAnomaly = 1
    OR humidityAnomaly.IsAnomaly = 1
    OR pressureAnomaly.IsAnomaly = 1

Core Topics

Integration Guides

Best Practices


Last Updated: 2025-01-28 Complexity: Advanced Estimated Reading Time: 20 minutes