Skip to content

title: "< Edge Deployments" description: "> < Home | = Overview | = Services | = Streaming Services | __ [Stream Analytics](REA..." tags: - services - stream-analytics


< Edge Deployments

< Home | = Overview | = Services | = Streaming Services | __ Stream Analytics__ | < Edge Deployments

Complexity Type

Deploy Azure Stream Analytics jobs to IoT Edge devices for low-latency, edge-based stream processing.


< Overview

Azure Stream Analytics on IoT Edge enables you to run the same Stream Analytics queries on edge devices that you run in the cloud. This provides ultra-low-latency analytics, reduced bandwidth costs, and offline operation capabilities.

Cloud vs Edge Deployment

graph TB
    subgraph "Cloud Deployment"
        EdgeDevices1[Edge Devices] -->|Raw Data| EventHub1[Event Hubs]
        EventHub1 --> CloudASA[Stream Analytics<br/>Cloud]
        CloudASA --> Storage1[Storage/DB]
    end

    subgraph "Edge Deployment"
        EdgeDevices2[Edge Devices] --> EdgeASA[Stream Analytics<br/>@ Edge]
        EdgeASA -->|Processed Data| EventHub2[Event Hubs]
        EdgeASA -->|Local Actions| LocalStorage[Local Storage]
        EventHub2 --> CloudServices[Cloud Services]
    end

Benefits of Edge Deployment

  • Low Latency: Process data locally without cloud round-trip
  • Reduced Bandwidth: Send only processed/filtered data to cloud
  • Offline Operation: Continue processing during connectivity loss
  • Data Privacy: Keep sensitive data on-premises
  • Cost Savings: Lower data transfer and cloud processing costs

< Architecture

Edge Processing Architecture

graph TB
    subgraph "IoT Edge Device"
        subgraph "Edge Modules"
            Sensors[Sensor Module] --> EdgeHub[IoT Edge Hub]
            Camera[Camera Module] --> EdgeHub
            EdgeHub --> ASAEdge[Stream Analytics<br/>Edge Module]
            ASAEdge --> Actions[Action Module]
            ASAEdge --> LocalDB[Local Storage<br/>Module]
        end
    end

    subgraph "Cloud Services"
        EdgeHub -->|Filtered Data| IoTHub[Azure IoT Hub]
        IoTHub --> CloudASA[Cloud Stream<br/>Analytics]
        IoTHub --> Storage[Azure Storage]
        CloudASA --> PowerBI[Power BI]
    end

    Actions --> Actuators[Local Actuators]

= Getting Started

Prerequisites

# Install Azure IoT Edge runtime
# For Ubuntu/Debian
curl https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb > ./packages-microsoft-prod.deb
sudo dpkg -i ./packages-microsoft-prod.deb
sudo apt-get update
sudo apt-get install aziot-edge

# For Windows
# Download and run installer from:
# https://aka.ms/iotedge-win

Step 1: Create Stream Analytics Edge Job

# Create resource group
az group create --name rg-asa-edge --location eastus

# Create Stream Analytics job for Edge
az stream-analytics job create \
  --resource-group rg-asa-edge \
  --name asa-edge-job \
  --location eastus \
  --compatibility-level 1.2 \
  --type Edge

# Show job details
az stream-analytics job show \
  --resource-group rg-asa-edge \
  --name asa-edge-job

Step 2: Define Edge Job Query

-- Example: Temperature filtering and alerting at the edge
WITH FilteredData AS (
    SELECT
        deviceId,
        temperature,
        humidity,
        System.Timestamp() AS timestamp
    FROM
        EdgeInput
    WHERE
        temperature > 70  -- Only process high temperatures
),
Alerts AS (
    SELECT
        deviceId,
        AVG(temperature) AS avgTemp,
        MAX(temperature) AS maxTemp,
        System.Timestamp() AS windowEnd
    FROM
        FilteredData
    GROUP BY
        deviceId,
        TumblingWindow(second, 30)
    HAVING
        AVG(temperature) > 80
)

-- Send all data to local storage
SELECT * INTO LocalOutput FROM FilteredData

-- Send only alerts to cloud
SELECT * INTO CloudOutput FROM Alerts

Step 3: Package and Deploy

# Package the job for Edge deployment
az stream-analytics job publish \
  --resource-group rg-asa-edge \
  --name asa-edge-job \
  --publish-path "./asa-edge-package"

# Create IoT Edge deployment manifest
cat > deployment.template.json <<EOF
{
  "modulesContent": {
    "$edgeAgent": {
      "properties.desired": {
        "modules": {
          "StreamAnalytics": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "mcr.microsoft.com/azure-stream-analytics/azureiotedge:latest"
            },
            "env": {
              "PlanId": {
                "value": "{YOUR_PLAN_ID}"
              }
            }
          }
        }
      }
    },
    "$edgeHub": {
      "properties.desired": {
        "routes": {
          "ASAToIoTHub": "FROM /messages/modules/StreamAnalytics/* INTO $upstream"
        }
      }
    },
    "StreamAnalytics": {
      "properties.desired": {
        "ASAJobInfo": "{YOUR_JOB_INFO}"
      }
    }
  }
}
EOF

# Deploy to IoT Edge device
az iot edge set-modules \
  --device-id my-edge-device \
  --hub-name my-iothub \
  --content deployment.template.json

= Code Examples

Example 1: Temperature Monitoring at Edge

-- Process sensor data at the edge with immediate alerts
WITH SensorReadings AS (
    SELECT
        deviceId,
        sensorId,
        temperature,
        pressure,
        humidity,
        System.Timestamp() AS readingTime
    FROM
        IotHubInput
),
ImmediateAlerts AS (
    -- Critical alerts processed immediately
    SELECT
        deviceId,
        sensorId,
        temperature,
        'CRITICAL' AS severity,
        readingTime
    FROM
        SensorReadings
    WHERE
        temperature > 100
),
AggregatedMetrics AS (
    -- Aggregate for cloud reporting
    SELECT
        deviceId,
        AVG(temperature) AS avgTemp,
        AVG(pressure) AS avgPressure,
        AVG(humidity) AS avgHumidity,
        COUNT(*) AS readingCount,
        System.Timestamp() AS windowEnd
    FROM
        SensorReadings
    GROUP BY
        deviceId,
        TumblingWindow(minute, 5)
)

-- Immediate alerts to local actuator
SELECT * INTO LocalActuator FROM ImmediateAlerts

-- Aggregated data to cloud for analytics
SELECT * INTO IotHubOutput FROM AggregatedMetrics

-- All raw data to local storage for audit
SELECT * INTO BlobOutput FROM SensorReadings

Example 2: Video Analytics at Edge

-- Process video analytics frames at the edge
WITH VideoFrames AS (
    SELECT
        cameraId,
        frameId,
        timestamp,
        detectedObjects,
        confidenceScore
    FROM
        VideoInput
),
ObjectDetections AS (
    SELECT
        cameraId,
        frameId,
        timestamp,
        GetArrayElement(detectedObjects, 0) AS primaryObject,
        confidenceScore
    FROM
        VideoFrames
    WHERE
        confidenceScore > 0.8  -- High confidence detections only
),
SecurityAlerts AS (
    SELECT
        cameraId,
        COUNT(*) AS detectionCount,
        COLLECT() AS detections,
        System.Timestamp() AS windowEnd
    FROM
        ObjectDetections
    WHERE
        GetRecordPropertyValue(primaryObject, 'class') = 'person'
        AND DATEPART(hour, System.Timestamp()) BETWEEN 22 AND 6  -- After hours
    GROUP BY
        cameraId,
        SlidingWindow(minute, 5)
    HAVING
        COUNT(*) > 3  -- Multiple detections
)

-- Send alerts to cloud
SELECT * INTO CloudAlerts FROM SecurityAlerts

-- Store detections locally
SELECT * INTO LocalStorage FROM ObjectDetections

Example 3: Predictive Maintenance at Edge

-- Edge-based predictive maintenance
WITH MachineMetrics AS (
    SELECT
        machineId,
        vibration,
        temperature,
        pressure,
        rotationalSpeed,
        System.Timestamp() AS timestamp
    FROM
        MachineInput
),
HealthScore AS (
    SELECT
        machineId,
        -- Calculate health score from multiple metrics
        CASE
            WHEN vibration > 10 THEN 0.4
            WHEN vibration > 8 THEN 0.7
            ELSE 1.0
        END AS vibrationScore,
        CASE
            WHEN temperature > 80 THEN 0.4
            WHEN temperature > 70 THEN 0.7
            ELSE 1.0
        END AS temperatureScore,
        (vibrationScore + temperatureScore) / 2 AS overallHealth,
        timestamp
    FROM
        MachineMetrics
),
MaintenanceAlerts AS (
    SELECT
        machineId,
        AVG(overallHealth) AS avgHealth,
        MIN(overallHealth) AS minHealth,
        System.Timestamp() AS windowEnd
    FROM
        HealthScore
    GROUP BY
        machineId,
        HoppingWindow(minute, 10, 5)
    HAVING
        AVG(overallHealth) < 0.6  -- Poor health score
)

-- Trigger local maintenance workflow
SELECT * INTO MaintenanceModule FROM MaintenanceAlerts

-- Report to cloud for trending
SELECT * INTO CloudTelemetry FROM HealthScore

=' Edge-Specific Considerations

Local Storage Options

// C# Edge Module: Store data locally
using Microsoft.Azure.Devices.Client;
using System.IO;
using Newtonsoft.Json;

public class LocalStorageModule
{
    private readonly string storagePathpath = "/mnt/data";

    public async Task StoreDataAsync(string data)
    {
        var filename = $"{storagePathpath}/data_{DateTime.UtcNow:yyyyMMddHHmmss}.json";

        // Ensure directory exists
        Directory.CreateDirectory(storagePathpath);

        // Write data to file
        await File.WriteAllTextAsync(filename, data);

        // Optional: Upload to cloud when connectivity available
        if (IsConnectedToCloud())
        {
            await UploadToCloudAsync(filename);
        }
    }

    public async Task UploadToCloudAsync(string filename)
    {
        // Upload to Azure Blob Storage
        var blobClient = GetBlobClient();
        await blobClient.UploadAsync(filename);

        // Delete local file after successful upload
        File.Delete(filename);
    }
}

Offline Operation

-- Query design for offline scenarios
WITH BufferedData AS (
    SELECT
        *,
        System.Timestamp() AS processingTime
    FROM
        EdgeInput
),
ProcessedData AS (
    SELECT
        deviceId,
        AVG(value) AS avgValue,
        COUNT(*) AS eventCount,
        MIN(processingTime) AS windowStart,
        MAX(processingTime) AS windowEnd
    FROM
        BufferedData
    GROUP BY
        deviceId,
        TumblingWindow(minute, 1)
)

-- Always write to local storage (works offline)
SELECT * INTO LocalBlobStorage FROM ProcessedData

-- Opportunistically send to cloud (when online)
SELECT * INTO CloudOutput FROM ProcessedData

Resource Constraints

# Python script to monitor edge resource usage
import psutil
import logging
from datetime import datetime

class EdgeResourceMonitor:
    def __init__(self, threshold_cpu=80, threshold_memory=80):
        self.threshold_cpu = threshold_cpu
        self.threshold_memory = threshold_memory

    def check_resources(self):
        """Monitor CPU and memory usage."""
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()

        metrics = {
            "timestamp": datetime.utcnow().isoformat(),
            "cpu_percent": cpu_percent,
            "memory_percent": memory.percent,
            "memory_available_mb": memory.available / (1024 * 1024)
        }

        # Log warnings if thresholds exceeded
        if cpu_percent > self.threshold_cpu:
            logging.warning(f"High CPU usage: {cpu_percent}%")

        if memory.percent > self.threshold_memory:
            logging.warning(f"High memory usage: {memory.percent}%")

        return metrics

    def optimize_asa_job(self, metrics):
        """Adjust ASA job based on resource availability."""
        if metrics["cpu_percent"] > self.threshold_cpu:
            # Reduce query complexity or increase windowing intervals
            logging.info("Optimizing ASA job for CPU constraints")

        if metrics["memory_percent"] > self.threshold_memory:
            # Reduce buffer sizes or checkpoint frequency
            logging.info("Optimizing ASA job for memory constraints")

# Usage
monitor = EdgeResourceMonitor()
metrics = monitor.check_resources()
print(f"CPU: {metrics['cpu_percent']}%, Memory: {metrics['memory_percent']}%")

= Monitoring Edge Deployments

Monitor Edge Job Status

# Check IoT Edge module status
az iot edge list-modules \
  --device-id my-edge-device \
  --hub-name my-iothub

# View Stream Analytics module logs
az iot edge logs \
  --device-id my-edge-device \
  --hub-name my-iothub \
  --module-id StreamAnalytics \
  --tail 100

# Monitor module health
az iot edge show-module \
  --device-id my-edge-device \
  --hub-name my-iothub \
  --module-id StreamAnalytics

Custom Metrics

# Python script to collect custom edge metrics
from azure.iot.device import IoTHubModuleClient
import json
import time

class EdgeMetricsCollector:
    def __init__(self):
        self.client = IoTHubModuleClient.create_from_edge_environment()

    async def send_metrics(self, metrics):
        """Send custom metrics to IoT Hub."""
        message = {
            "deviceId": self.client.device_id,
            "moduleId": "StreamAnalytics",
            "metrics": metrics,
            "timestamp": time.time()
        }

        await self.client.send_message_to_output(
            json.dumps(message),
            "metrics"
        )

    async def collect_asa_metrics(self):
        """Collect Stream Analytics specific metrics."""
        return {
            "eventsProcessed": get_events_processed(),
            "queryLatency": get_query_latency_ms(),
            "memoryUsage": get_memory_usage_mb(),
            "cpuUsage": get_cpu_usage_percent()
        }

# Usage
collector = EdgeMetricsCollector()
metrics = await collector.collect_asa_metrics()
await collector.send_metrics(metrics)

= Security Best Practices

Secure Edge Deployment

# Use IoT Edge security daemon
sudo iotedge config mp --connection-string 'HostName=...'

# Configure module identity
az iot edge set-modules \
  --device-id my-edge-device \
  --hub-name my-iothub \
  --content deployment.json \
  --auth-method x509

Data Encryption at Edge

// Encrypt sensitive data before storing locally
using System.Security.Cryptography;
using System.Text;

public class EdgeDataEncryption
{
    private readonly byte[] encryptionKey;

    public EdgeDataEncryption(string keyString)
    {
        encryptionKey = Encoding.UTF8.GetBytes(keyString);
    }

    public byte[] EncryptData(string plainText)
    {
        using (Aes aes = Aes.Create())
        {
            aes.Key = encryptionKey;
            aes.GenerateIV();

            var encryptor = aes.CreateEncryptor(aes.Key, aes.IV);
            byte[] encrypted;

            using (var ms = new MemoryStream())
            {
                ms.Write(aes.IV, 0, aes.IV.Length);

                using (var cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write))
                using (var sw = new StreamWriter(cs))
                {
                    sw.Write(plainText);
                }

                encrypted = ms.ToArray();
            }

            return encrypted;
        }
    }
}

Core Topics

Integration Guides

  • IoT Hub Integration
  • Edge Module Development

Best Practices


Last Updated: 2025-01-28 Complexity: Advanced Estimated Reading Time: 30 minutes