Skip to content

📊 Stream Analytics to Synapse - Lambda Architecture

Complexity Duration Services

Integrate Stream Analytics with Synapse Analytics for real-time to batch analytics workflows implementing Lambda Architecture.


🎯 Overview

Build a comprehensive Lambda Architecture combining real-time (speed layer) and batch processing (batch layer) for enterprise data warehousing and analytics.

Lambda Architecture

graph TB
    Sources[Data Sources] --> EventHubs[Event Hubs]

    EventHubs --> Speed[Stream Analytics<br/>Speed Layer]
    EventHubs --> Batch[Data Factory<br/>Batch Layer]

    Speed --> RealTime[Synapse SQL<br/>Real-time Views]
    Batch --> Historical[Delta Lake<br/>Historical Data]

    RealTime --> Serving[Synapse SQL Pool<br/>Serving Layer]
    Historical --> Serving

    Serving --> PowerBI[Power BI<br/>Analytics]
    Serving --> Apps[Applications]

🚀 Implementation

Step 1: Create Core Infrastructure

RESOURCE_GROUP="rg-lambda-architecture"
LOCATION="eastus"
STORAGE_ACCOUNT="adlslambda$(openssl rand -hex 4)"
SYNAPSE_WORKSPACE="synapse-lambda-$(openssl rand -hex 4)"

# Create resource group
az group create --name $RESOURCE_GROUP --location $LOCATION

# Create Data Lake Storage Gen2
az storage account create \
  --name $STORAGE_ACCOUNT \
  --resource-group $RESOURCE_GROUP \
  --location $LOCATION \
  --sku Standard_LRS \
  --kind StorageV2 \
  --enable-hierarchical-namespace true

# Create file systems
az storage fs create \
  --name "delta-lake" \
  --account-name $STORAGE_ACCOUNT \
  --auth-mode login

az storage fs create \
  --name "synapse" \
  --account-name $STORAGE_ACCOUNT \
  --auth-mode login

Step 2: Create Synapse Workspace

# Create Synapse workspace
az synapse workspace create \
  --name $SYNAPSE_WORKSPACE \
  --resource-group $RESOURCE_GROUP \
  --location $LOCATION \
  --storage-account $STORAGE_ACCOUNT \
  --file-system synapse \
  --sql-admin-login-user sqladmin \
  --sql-admin-login-password "P@ssw0rd$(openssl rand -hex 4)!"

# Create firewall rule
az synapse workspace firewall-rule create \
  --name AllowAll \
  --workspace-name $SYNAPSE_WORKSPACE \
  --resource-group $RESOURCE_GROUP \
  --start-ip-address 0.0.0.0 \
  --end-ip-address 255.255.255.255

# Create dedicated SQL pool
az synapse sql pool create \
  --name "DWPool" \
  --workspace-name $SYNAPSE_WORKSPACE \
  --resource-group $RESOURCE_GROUP \
  --performance-level DW100c

Step 3: Create Event Hubs

EVENTHUBS_NAMESPACE="evhns-lambda-$(openssl rand -hex 4)"
EVENTHUB_NAME="analytics-events"

az eventhubs namespace create \
  --name $EVENTHUBS_NAMESPACE \
  --resource-group $RESOURCE_GROUP \
  --location $LOCATION \
  --sku Standard \
  --capacity 2

az eventhubs eventhub create \
  --name $EVENTHUB_NAME \
  --namespace-name $EVENTHUBS_NAMESPACE \
  --resource-group $RESOURCE_GROUP \
  --partition-count 8 \
  --message-retention 7 \
  --capture-enabled true \
  --capture-encoding Avro \
  --capture-interval-seconds 300 \
  --capture-size-limit 314572800 \
  --destination-name EventHubArchive.AzureBlockBlob \
  --storage-account-resource-id $(az storage account show --name $STORAGE_ACCOUNT --resource-group $RESOURCE_GROUP --query id --output tsv) \
  --blob-container delta-lake \
  --archive-name-format '{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}'

⚡ Speed Layer - Stream Analytics

Create Stream Analytics Job

STREAM_ANALYTICS_JOB="asa-speed-layer"

az stream-analytics job create \
  --name $STREAM_ANALYTICS_JOB \
  --resource-group $RESOURCE_GROUP \
  --location $LOCATION \
  --sku Standard \
  --streaming-units 6

Stream Analytics Query

-- Speed Layer Query
WITH Events AS (
    SELECT
        deviceId,
        sensorType,
        value,
        timestamp,
        location,
        System.Timestamp() AS processingTime
    FROM EventHubInput
    TIMESTAMP BY timestamp
)

-- Real-time aggregations (1-minute windows)
SELECT
    sensorType,
    location,
    System.Timestamp() AS windowEnd,
    AVG(value) AS avgValue,
    MAX(value) AS maxValue,
    MIN(value) AS minValue,
    COUNT(*) AS eventCount,
    STDEV(value) AS stdDevValue
INTO SynapseOutput
FROM Events
GROUP BY
    sensorType,
    location,
    TumblingWindow(minute, 1);

-- Real-time anomaly detection
SELECT
    deviceId,
    sensorType,
    value,
    timestamp,
    AnomalyDetection_SpikeAndDip(value, 95, 120, 'spikesanddips')
        OVER(LIMIT DURATION(minute, 5)) AS anomalyScore
INTO SynapseAnomalies
FROM Events
PARTITION BY deviceId;

-- Trending metrics (5-minute sliding window)
SELECT
    sensorType,
    System.Timestamp() AS windowEnd,
    AVG(value) AS avgValue,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) AS p95Value
INTO SynapseTrends
FROM Events
GROUP BY
    sensorType,
    SlidingWindow(minute, 5);

🏭 Batch Layer - Data Factory

Create Data Factory Pipeline

{
  "name": "BatchProcessingPipeline",
  "properties": {
    "activities": [
      {
        "name": "CopyFromEventHubCapture",
        "type": "Copy",
        "inputs": [
          {
            "referenceName": "EventHubCaptureDataset",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "DeltaLakeDataset",
            "type": "DatasetReference"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "AvroSource"
          },
          "sink": {
            "type": "ParquetSink",
            "formatSettings": {
              "type": "ParquetWriteSettings"
            }
          }
        }
      },
      {
        "name": "ProcessWithSynapseSpark",
        "type": "SynapseNotebook",
        "dependsOn": [
          {
            "activity": "CopyFromEventHubCapture",
            "dependencyConditions": ["Succeeded"]
          }
        ],
        "typeProperties": {
          "notebook": {
            "referenceName": "DeltaLakeProcessing",
            "type": "NotebookReference"
          }
        }
      },
      {
        "name": "LoadToDWPool",
        "type": "SqlServerStoredProcedure",
        "dependsOn": [
          {
            "activity": "ProcessWithSynapseSpark",
            "dependencyConditions": ["Succeeded"]
          }
        ],
        "typeProperties": {
          "storedProcedureName": "sp_LoadAggregatedData"
        }
      }
    ],
    "triggers": [
      {
        "name": "HourlyTrigger",
        "properties": {
          "type": "ScheduleTrigger",
          "typeProperties": {
            "recurrence": {
              "frequency": "Hour",
              "interval": 1
            }
          }
        }
      }
    ]
  }
}

🗄️ Synapse SQL Pool Schema

Create Tables

-- Real-time aggregations table (Speed Layer)
CREATE TABLE dbo.RealTimeMetrics (
    MetricId BIGINT IDENTITY(1,1) NOT NULL,
    SensorType NVARCHAR(50) NOT NULL,
    Location NVARCHAR(100) NOT NULL,
    WindowEnd DATETIME2 NOT NULL,
    AvgValue FLOAT NOT NULL,
    MaxValue FLOAT NOT NULL,
    MinValue FLOAT NOT NULL,
    EventCount INT NOT NULL,
    StdDevValue FLOAT,
    ProcessedTime DATETIME2 DEFAULT GETUTCDATE(),
    CONSTRAINT PK_RealTimeMetrics PRIMARY KEY NONCLUSTERED (MetricId)
)
WITH (DISTRIBUTION = HASH(SensorType), CLUSTERED COLUMNSTORE INDEX);

-- Historical aggregations (Batch Layer)
CREATE TABLE dbo.HistoricalMetrics (
    MetricId BIGINT IDENTITY(1,1) NOT NULL,
    SensorType NVARCHAR(50) NOT NULL,
    Location NVARCHAR(100) NOT NULL,
    Date DATE NOT NULL,
    Hour INT NOT NULL,
    AvgValue FLOAT NOT NULL,
    MaxValue FLOAT NOT NULL,
    MinValue FLOAT NOT NULL,
    EventCount BIGINT NOT NULL,
    ProcessedTime DATETIME2 DEFAULT GETUTCDATE(),
    CONSTRAINT PK_HistoricalMetrics PRIMARY KEY NONCLUSTERED (MetricId)
)
WITH (DISTRIBUTION = HASH(SensorType), CLUSTERED COLUMNSTORE INDEX);

-- Serving layer view (combines speed and batch)
CREATE VIEW dbo.UnifiedMetrics AS
SELECT
    SensorType,
    Location,
    WindowEnd AS Timestamp,
    AvgValue,
    MaxValue,
    MinValue,
    EventCount,
    'RealTime' AS Source
FROM dbo.RealTimeMetrics
WHERE WindowEnd > DATEADD(hour, -24, GETUTCDATE())
UNION ALL
SELECT
    SensorType,
    Location,
    DATEADD(hour, Hour, CAST(Date AS DATETIME2)) AS Timestamp,
    AvgValue,
    MaxValue,
    MinValue,
    EventCount,
    'Historical' AS Source
FROM dbo.HistoricalMetrics
WHERE Date < CAST(GETUTCDATE() AS DATE);

🔄 Delta Lake Processing

Synapse Spark Notebook

# Delta Lake batch processing notebook
from pyspark.sql.functions import *
from delta.tables import DeltaTable

# Read captured Avro files
captured_path = "abfss://delta-lake@<storage>.dfs.core.windows.net/raw-events"
df = spark.read.format("avro").load(captured_path)

# Parse event body
parsed_df = (df
  .select(
    from_json(col("Body").cast("string"), schema).alias("event"),
    col("EnqueuedTimeUtc").alias("eventTime")
  )
  .select("event.*", "eventTime")
)

# Create hourly aggregations
hourly_agg = (parsed_df
  .withColumn("date", to_date("eventTime"))
  .withColumn("hour", hour("eventTime"))
  .groupBy("sensorType", "location", "date", "hour")
  .agg(
    avg("value").alias("avgValue"),
    max("value").alias("maxValue"),
    min("value").alias("minValue"),
    count("*").alias("eventCount")
  )
)

# Write to Delta Lake
delta_path = "abfss://delta-lake@<storage>.dfs.core.windows.net/aggregated-metrics"

(hourly_agg
  .write
  .format("delta")
  .mode("append")
  .partitionBy("date")
  .option("mergeSchema", "true")
  .save(delta_path)
)

# Optimize Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.optimize().executeCompaction()
deltaTable.vacuum(168)  # 7 days

# Create external table in Synapse SQL
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS default.aggregated_metrics
    USING DELTA
    LOCATION '{delta_path}'
""")

🔍 Querying the Serving Layer

Recent Data (Speed Layer)

-- Last hour metrics
SELECT
    SensorType,
    Location,
    WindowEnd,
    AvgValue,
    MaxValue,
    EventCount
FROM dbo.RealTimeMetrics
WHERE WindowEnd > DATEADD(hour, -1, GETUTCDATE())
ORDER BY WindowEnd DESC;

Historical Data (Batch Layer)

-- Last 30 days trends
SELECT
    Date,
    SensorType,
    AVG(AvgValue) AS DailyAvg,
    MAX(MaxValue) AS DailyMax,
    SUM(EventCount) AS DailyEvents
FROM dbo.HistoricalMetrics
WHERE Date > DATEADD(day, -30, GETUTCDATE())
GROUP BY Date, SensorType
ORDER BY Date DESC, SensorType;

Combined View

-- Unified metrics (last 7 days)
SELECT
    SensorType,
    Location,
    CAST(Timestamp AS DATE) AS Date,
    AVG(AvgValue) AS DailyAvg,
    MAX(MaxValue) AS DailyMax,
    SUM(EventCount) AS DailyEvents,
    MAX(Source) AS DataSource
FROM dbo.UnifiedMetrics
WHERE Timestamp > DATEADD(day, -7, GETUTCDATE())
GROUP BY
    SensorType,
    Location,
    CAST(Timestamp AS DATE)
ORDER BY Date DESC, SensorType;

📊 Complete Bicep Template

param location string = resourceGroup().location
param namePrefix string = 'lambda'
param sqlAdminPassword string

resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {
  name: '${namePrefix}adls${uniqueString(resourceGroup().id)}'
  location: location
  sku: {
    name: 'Standard_LRS'
  }
  kind: 'StorageV2'
  properties: {
    isHnsEnabled: true
  }
}

resource eventHubsNamespace 'Microsoft.EventHub/namespaces@2023-01-01-preview' = {
  name: '${namePrefix}-evhns-${uniqueString(resourceGroup().id)}'
  location: location
  sku: {
    name: 'Standard'
    capacity: 2
  }
}

resource synapseWorkspace 'Microsoft.Synapse/workspaces@2021-06-01' = {
  name: '${namePrefix}-synapse-${uniqueString(resourceGroup().id)}'
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  properties: {
    defaultDataLakeStorage: {
      accountUrl: storageAccount.properties.primaryEndpoints.dfs
      filesystem: 'synapse'
    }
    sqlAdministratorLogin: 'sqladmin'
    sqlAdministratorLoginPassword: sqlAdminPassword
  }
}

resource sqlPool 'Microsoft.Synapse/workspaces/sqlPools@2021-06-01' = {
  parent: synapseWorkspace
  name: 'DWPool'
  location: location
  sku: {
    name: 'DW100c'
  }
  properties: {
    collation: 'SQL_Latin1_General_CP1_CI_AS'
  }
}

output synapseWorkspaceName string = synapseWorkspace.name
output sqlPoolName string = sqlPool.name
output storageAccountName string = storageAccount.name

💰 Cost Optimization

Synapse SQL Pool

-- Pause SQL pool when not in use
ALTER DATABASE DWPool PAUSE;

-- Resume when needed
ALTER DATABASE DWPool RESUME;

-- Scheduled pause/resume with automation

Resource Optimization

  • Pause SQL pools during off-hours
  • Scale down Stream Analytics SUs during low traffic
  • Use serverless SQL for ad-hoc queries
  • Implement data retention policies
  • Optimize Delta Lake compaction

📚 Next Steps


Last Updated: 2025-01-28 Complexity: Advanced Duration: 60 minutes