Skip to content

⚡🌊 Lambda-Kappa Hybrid Architecture

Status Complexity Type

Combines the strengths of both Lambda and Kappa architectures to provide flexible data processing capabilities for mixed batch and streaming workloads.


📋 Table of Contents


🎯 Overview

The Lambda-Kappa Hybrid architecture combines batch and stream processing paradigms, allowing organizations to process data using the most appropriate method for each use case while maintaining a unified data platform.

Key Principles

  • Flexible Processing: Choose batch or stream based on workload characteristics
  • Unified Storage: Single source of truth in a data lake
  • Multiple Compute Engines: Optimized for different processing patterns
  • Incremental Adoption: Migrate from batch to stream incrementally
  • Cost Optimization: Use appropriate processing tier for each workload

Architecture Benefits

Benefit Description Business Impact
Flexibility Support both batch and streaming workloads Adapt to changing requirements
Cost Efficiency Choose appropriate processing model Optimize compute costs
Gradual Migration Move from batch to stream incrementally Reduce migration risk
Unified Platform Single data platform for all processing Simplified operations
Performance Optimize each workload independently Better SLAs

🏗️ Architecture Components

graph TB
    subgraph "Data Ingestion Layer"
        Sources[Data Sources]
        Batch[Batch Ingestion<br/>Data Factory]
        Stream[Stream Ingestion<br/>Event Hubs]
    end

    subgraph "Storage Layer"
        DataLake[Azure Data Lake Gen2<br/>Delta Lake Format]
        subgraph "Lake Zones"
            Raw[Raw Zone]
            Refined[Refined Zone]
            Curated[Curated Zone]
        end
    end

    subgraph "Processing Layer - Batch"
        BatchProc[Batch Processing<br/>Synapse Spark Pools]
        ScheduledJobs[Scheduled Jobs<br/>Databricks]
    end

    subgraph "Processing Layer - Stream"
        StreamProc[Stream Processing<br/>Stream Analytics]
        RealTime[Real-time Processing<br/>Spark Streaming]
    end

    subgraph "Serving Layer"
        DWSQL[Data Warehouse<br/>Synapse Dedicated SQL]
        Cache[Speed Cache<br/>Cosmos DB]
        API[Unified Query API<br/>Synapse Serverless]
    end

    subgraph "Consumption Layer"
        PowerBI[Power BI]
        Apps[Applications]
        ML[ML Models]
    end

    Sources --> Batch
    Sources --> Stream
    Batch --> Raw
    Stream --> Raw
    Raw --> BatchProc
    Raw --> StreamProc
    BatchProc --> Refined
    StreamProc --> Refined
    Refined --> ScheduledJobs
    Refined --> RealTime
    ScheduledJobs --> Curated
    RealTime --> Curated
    Curated --> DWSQL
    Curated --> Cache
    Curated --> API
    DWSQL --> PowerBI
    Cache --> Apps
    API --> ML

    classDef ingestion fill:#e3f2fd
    classDef storage fill:#f3e5f5
    classDef batch fill:#fff3e0
    classDef stream fill:#e8f5e9
    classDef serving fill:#fce4ec
    classDef consumption fill:#f1f8e9

    class Batch,Stream ingestion
    class DataLake,Raw,Refined,Curated storage
    class BatchProc,ScheduledJobs batch
    class StreamProc,RealTime stream
    class DWSQL,Cache,API serving
    class PowerBI,Apps,ML consumption

Core Components

1. Ingestion Layer

Batch Ingestion: - Azure Data Factory for scheduled data loads - Bulk import from enterprise systems - Historical data backfills - Large file processing

Stream Ingestion: - Azure Event Hubs for real-time events - IoT Hub for device telemetry - Kafka connectors for existing streams - Change Data Capture (CDC) streams

2. Storage Layer

Unified Data Lake: - Azure Data Lake Gen2 as foundation - Delta Lake format for ACID transactions - Time travel capabilities - Schema evolution support

Zone Architecture: - Raw Zone: Immutable source data - Refined Zone: Validated and cleaned data - Curated Zone: Business-ready aggregates

3. Processing Layer

Batch Processing: - Synapse Spark pools for large-scale transformations - Scheduled processing windows - Complex aggregations and joins - Historical analysis

Stream Processing: - Azure Stream Analytics for SQL-based streaming - Spark Structured Streaming for complex logic - Real-time aggregations - Event-time processing

4. Serving Layer

Multiple Query Patterns: - Synapse Dedicated SQL for data warehouse queries - Cosmos DB for low-latency lookups - Synapse Serverless SQL for ad-hoc queries - Power BI for business intelligence


☁️ Azure Service Mapping

Primary Services

Layer Service Purpose When to Use
Ingestion Azure Data Factory Batch ETL/ELT pipelines Scheduled data movement
Ingestion Azure Event Hubs Stream ingestion Real-time events
Storage Data Lake Gen2 Unified data storage All data persistence
Storage Delta Lake ACID transactions Data quality requirements
Processing Synapse Spark Pools Batch processing Large-scale transformations
Processing Stream Analytics Stream processing SQL-based streaming
Serving Synapse Dedicated SQL Data warehouse BI and reporting
Serving Cosmos DB Speed cache Low-latency queries
Serving Synapse Serverless SQL Ad-hoc queries Data exploration

Supporting Services

  • Azure Purview: Data governance and lineage
  • Azure Monitor: Performance monitoring
  • Key Vault: Secrets management
  • Azure DevOps: CI/CD pipelines
  • Power BI: Business intelligence

🔧 Implementation Patterns

Pattern 1: Batch-First with Stream Augmentation

graph LR
    subgraph "Batch Path (Primary)"
        B1[Historical Data] --> B2[Batch Processing]
        B2 --> B3[Data Warehouse]
    end

    subgraph "Stream Path (Augmentation)"
        S1[Real-time Events] --> S2[Stream Processing]
        S2 --> S3[Speed Cache]
    end

    B3 --> Query[Unified Query Layer]
    S3 --> Query
    Query --> Results[Query Results]

    classDef batch fill:#fff3e0
    classDef stream fill:#e8f5e9

    class B1,B2,B3 batch
    class S1,S2,S3 stream

Use Case: Traditional enterprise with emerging real-time needs

Implementation Steps:

  1. Establish batch data warehouse as foundation
  2. Add stream ingestion for time-sensitive data
  3. Create materialized views combining batch and stream
  4. Implement query routing logic

PySpark Example - Batch Processing:

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

spark = SparkSession.builder \
    .appName("BatchProcessing") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Read from raw zone
raw_data = spark.read.format("delta").load("/mnt/datalake/raw/transactions")

# Apply transformations
refined_data = raw_data \
    .filter(col("status") == "completed") \
    .withColumn("processed_timestamp", current_timestamp()) \
    .select("transaction_id", "customer_id", "amount", "transaction_date", "processed_timestamp")

# Write to refined zone with merge
delta_table = DeltaTable.forPath(spark, "/mnt/datalake/refined/transactions")

delta_table.alias("target") \
    .merge(
        refined_data.alias("source"),
        "target.transaction_id = source.transaction_id"
    ) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

Stream Processing Example:

-- Azure Stream Analytics Query
WITH EnrichedEvents AS (
    SELECT
        event_id,
        customer_id,
        event_type,
        event_timestamp,
        System.Timestamp() AS processing_timestamp
    FROM
        EventHubInput TIMESTAMP BY event_timestamp
)

-- Real-time aggregation
SELECT
    customer_id,
    COUNT(*) as event_count,
    System.Timestamp() AS window_end
INTO
    CosmosDBOutput
FROM
    EnrichedEvents
GROUP BY
    customer_id,
    TumblingWindow(minute, 5)

Pattern 2: Stream-First with Batch Backfill

graph LR
    subgraph "Stream Path (Primary)"
        S1[Live Events] --> S2[Stream Processing]
        S2 --> S3[Live Tables]
    end

    subgraph "Batch Path (Backfill)"
        B1[Historical Load] --> B2[Batch Processing]
        B2 --> B3[Historical Tables]
    end

    S3 --> Merge[Table Merge]
    B3 --> Merge
    Merge --> Unified[Unified View]

    classDef stream fill:#e8f5e9
    classDef batch fill:#fff3e0

    class S1,S2,S3 stream
    class B1,B2,B3 batch

Use Case: Modern applications with historical data requirements

Implementation Steps:

  1. Build stream processing pipeline first
  2. Design tables to support both stream and batch writes
  3. Run batch backfill for historical data
  4. Implement merge logic to unify views

Delta Lake Merge Example:

from delta.tables import DeltaTable
from pyspark.sql.functions import col, when

# Stream processing output
stream_path = "/mnt/datalake/stream/user_events"
# Batch processing output
batch_path = "/mnt/datalake/batch/user_events"
# Unified output
unified_path = "/mnt/datalake/unified/user_events"

# Read both sources
stream_df = spark.read.format("delta").load(stream_path)
batch_df = spark.read.format("delta").load(batch_path)

# Create or get unified table
if DeltaTable.isDeltaTable(spark, unified_path):
    unified_table = DeltaTable.forPath(spark, unified_path)
else:
    stream_df.write.format("delta").save(unified_path)
    unified_table = DeltaTable.forPath(spark, unified_path)

# Merge stream data
unified_table.alias("target") \
    .merge(
        stream_df.alias("source"),
        "target.event_id = source.event_id"
    ) \
    .whenMatchedUpdate(
        condition = "source.event_timestamp > target.event_timestamp",
        set = {
            "event_type": "source.event_type",
            "user_id": "source.user_id",
            "event_timestamp": "source.event_timestamp",
            "source_system": "lit('stream')"
        }
    ) \
    .whenNotMatchedInsert(
        values = {
            "event_id": "source.event_id",
            "event_type": "source.event_type",
            "user_id": "source.user_id",
            "event_timestamp": "source.event_timestamp",
            "source_system": "lit('stream')"
        }
    ) \
    .execute()

# Merge batch data
unified_table.alias("target") \
    .merge(
        batch_df.alias("source"),
        "target.event_id = source.event_id"
    ) \
    .whenNotMatchedInsert(
        values = {
            "event_id": "source.event_id",
            "event_type": "source.event_type",
            "user_id": "source.user_id",
            "event_timestamp": "source.event_timestamp",
            "source_system": "lit('batch')"
        }
    ) \
    .execute()

Pattern 3: Hybrid Processing with Workload Routing

graph TB
    Input[Data Sources]
    Router{Workload Router}

    Input --> Router

    Router -->|Large Volume<br/>Not Time-Sensitive| Batch[Batch Processing]
    Router -->|Low Latency<br/>Time-Sensitive| Stream[Stream Processing]
    Router -->|Exploratory| AdHoc[Ad-hoc Processing]

    Batch --> Lake[Data Lake]
    Stream --> Lake
    AdHoc --> Lake

    Lake --> Serve[Serving Layer]

    classDef route fill:#fff3e0
    classDef proc fill:#e8f5e9

    class Router route
    class Batch,Stream,AdHoc proc

Use Case: Mixed workload enterprise platform

Routing Logic Example:

from pyspark.sql import DataFrame
from typing import Dict, Any

class WorkloadRouter:
    """Route data processing based on workload characteristics."""

    def __init__(self, spark):
        self.spark = spark
        self.routing_rules = {
            "batch": {
                "min_volume": 1000000,
                "max_latency_minutes": 60,
                "compute_type": "spark_batch"
            },
            "stream": {
                "max_latency_seconds": 30,
                "compute_type": "stream_analytics"
            },
            "adhoc": {
                "max_volume": 100000,
                "compute_type": "serverless_sql"
            }
        }

    def route_workload(self, df: DataFrame, metadata: Dict[str, Any]) -> str:
        """Determine appropriate processing path."""
        row_count = df.count()
        latency_requirement = metadata.get("latency_requirement_seconds", 3600)

        # Stream processing for low latency
        if latency_requirement < 60:
            return "stream"

        # Ad-hoc processing for small datasets
        if row_count < 100000:
            return "adhoc"

        # Batch processing for large datasets
        return "batch"

    def process(self, df: DataFrame, metadata: Dict[str, Any]):
        """Process data using appropriate engine."""
        route = self.route_workload(df, metadata)

        if route == "batch":
            return self._process_batch(df, metadata)
        elif route == "stream":
            return self._process_stream(df, metadata)
        else:
            return self._process_adhoc(df, metadata)

    def _process_batch(self, df: DataFrame, metadata: Dict[str, Any]):
        """Batch processing with Spark."""
        output_path = metadata.get("output_path")

        df.write \
            .format("delta") \
            .mode("append") \
            .partitionBy("date") \
            .option("mergeSchema", "true") \
            .save(output_path)

        return {"status": "success", "route": "batch", "records": df.count()}

    def _process_stream(self, df: DataFrame, metadata: Dict[str, Any]):
        """Stream processing with Structured Streaming."""
        output_path = metadata.get("output_path")
        checkpoint_path = metadata.get("checkpoint_path")

        query = df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", checkpoint_path) \
            .start(output_path)

        return {"status": "streaming", "route": "stream", "query_id": query.id}

    def _process_adhoc(self, df: DataFrame, metadata: Dict[str, Any]):
        """Ad-hoc processing for exploration."""
        df.createOrReplaceTempView("adhoc_view")
        return {"status": "success", "route": "adhoc", "view": "adhoc_view"}

# Usage
router = WorkloadRouter(spark)

# Example: Large batch workload
large_df = spark.read.parquet("/mnt/data/large_dataset")
result = router.process(large_df, {
    "latency_requirement_seconds": 3600,
    "output_path": "/mnt/datalake/batch/output"
})

# Example: Real-time stream
stream_df = spark.readStream.format("delta").load("/mnt/datalake/stream/input")
result = router.process(stream_df, {
    "latency_requirement_seconds": 5,
    "output_path": "/mnt/datalake/stream/output",
    "checkpoint_path": "/mnt/checkpoints/stream1"
})

💼 Use Cases

1. E-commerce Analytics

Scenario: Combine batch order processing with real-time inventory updates

Architecture: - Batch Path: Daily order analytics, customer segmentation - Stream Path: Real-time inventory tracking, fraud detection - Hybrid: Unified customer 360 view

2. Financial Services Risk Management

Scenario: Historical risk analysis with real-time monitoring

Architecture: - Batch Path: Monthly risk reports, regulatory compliance - Stream Path: Real-time transaction monitoring, fraud alerts - Hybrid: Combined risk dashboard

3. Manufacturing IoT Analytics

Scenario: Equipment maintenance with production optimization

Architecture: - Batch Path: Historical equipment performance analysis - Stream Path: Real-time sensor monitoring, predictive maintenance - Hybrid: Unified equipment health dashboard


✅ Best Practices

1. Data Lake Organization

/mnt/datalake/
├── raw/
│   ├── batch/           # Batch ingested data
│   ├── stream/          # Stream ingested data
│   └── archive/         # Historical archives
├── refined/
│   ├── batch/           # Batch transformations
│   ├── stream/          # Stream transformations
│   └── merged/          # Combined views
└── curated/
    ├── warehouse/       # DW tables
    ├── cache/           # Speed layer cache
    └── ml/              # ML feature stores

2. Processing Guidelines

Characteristic Batch Processing Stream Processing
Data Volume > 1 million records < 1 million records
Latency SLA > 1 hour < 1 minute
Complexity Complex joins, aggregations Simple transformations
Cost Lower per record Higher per record
Schedule Fixed windows Continuous

3. Schema Management

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

# Define unified schema for batch and stream
unified_schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("event_type", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("event_timestamp", TimestampType(), False),
    StructField("amount", DoubleType(), True),
    StructField("source_system", StringType(), False),
    StructField("processing_timestamp", TimestampType(), False)
])

# Use schema evolution with Delta Lake
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

4. Monitoring and Alerting

# Monitor batch and stream processing health
from azure.monitor.query import LogsQueryClient
from datetime import timedelta

def monitor_hybrid_pipeline():
    """Monitor both batch and stream processing."""
    metrics = {
        "batch": {
            "last_run": None,
            "records_processed": 0,
            "duration_minutes": 0,
            "status": "unknown"
        },
        "stream": {
            "current_lag": 0,
            "records_per_second": 0,
            "error_rate": 0.0,
            "status": "unknown"
        }
    }

    # Check batch processing
    batch_df = spark.sql("""
        SELECT MAX(processing_timestamp) as last_run,
               COUNT(*) as records_processed
        FROM delta.`/mnt/datalake/refined/batch/transactions`
        WHERE date = current_date()
    """)
    metrics["batch"].update(batch_df.first().asDict())

    # Check stream processing
    stream_df = spark.sql("""
        SELECT COUNT(*) as current_lag
        FROM delta.`/mnt/datalake/stream/input`
        WHERE processing_timestamp IS NULL
    """)
    metrics["stream"]["current_lag"] = stream_df.first()["current_lag"]

    return metrics

⚡ Performance Optimization

1. Batch Processing Optimization

# Optimize batch processing with partitioning and Z-ordering
from delta.tables import DeltaTable

# Configure Spark for batch performance
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")

# Write with optimal partitioning
df.write \
    .format("delta") \
    .mode("append") \
    .partitionBy("date", "region") \
    .save("/mnt/datalake/refined/transactions")

# Optimize table layout
delta_table = DeltaTable.forPath(spark, "/mnt/datalake/refined/transactions")
delta_table.optimize().executeZOrderBy("customer_id")

2. Stream Processing Optimization

# Configure streaming for optimal performance
from pyspark.sql.streaming import DataStreamWriter

stream_query = spark.readStream \
    .format("delta") \
    .option("maxFilesPerTrigger", 1000) \
    .option("ignoreDeletes", "true") \
    .load("/mnt/datalake/stream/input") \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/stream1") \
    .trigger(processingTime='30 seconds') \
    .start("/mnt/datalake/stream/output")

3. Query Performance

-- Optimize queries across batch and stream tables
-- Use table statistics
ANALYZE TABLE delta.`/mnt/datalake/refined/batch/transactions` COMPUTE STATISTICS;

-- Create materialized views combining batch and stream
CREATE OR REPLACE VIEW unified_transactions AS
SELECT
    t.*,
    'batch' as source_layer
FROM delta.`/mnt/datalake/refined/batch/transactions` t
WHERE processing_timestamp < date_sub(current_timestamp(), 1)

UNION ALL

SELECT
    t.*,
    'stream' as source_layer
FROM delta.`/mnt/datalake/refined/stream/transactions` t
WHERE processing_timestamp >= date_sub(current_timestamp(), 1);

📊 Monitoring and Operations

Key Metrics

Metric Category Batch Metrics Stream Metrics
Throughput Records/hour Records/second
Latency Job duration End-to-end latency
Quality Error rate Late event rate
Cost Compute hours Streaming units

Operational Dashboard

# Create operational metrics dashboard
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def create_hybrid_dashboard(batch_metrics, stream_metrics):
    """Create operational dashboard for hybrid architecture."""

    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Batch Throughput', 'Stream Latency',
                       'Processing Costs', 'Error Rates')
    )

    # Batch throughput
    fig.add_trace(
        go.Scatter(x=batch_metrics['timestamps'],
                  y=batch_metrics['records_per_hour'],
                  name='Batch Throughput'),
        row=1, col=1
    )

    # Stream latency
    fig.add_trace(
        go.Scatter(x=stream_metrics['timestamps'],
                  y=stream_metrics['latency_ms'],
                  name='Stream Latency'),
        row=1, col=2
    )

    # Processing costs
    fig.add_trace(
        go.Bar(x=['Batch', 'Stream'],
              y=[batch_metrics['cost'], stream_metrics['cost']],
              name='Costs'),
        row=2, col=1
    )

    # Error rates
    fig.add_trace(
        go.Scatter(x=batch_metrics['timestamps'],
                  y=batch_metrics['error_rate'],
                  name='Batch Errors'),
        row=2, col=2
    )

    fig.update_layout(height=800, showlegend=True)
    return fig

🔄 Migration Strategy

Phase 1: Assessment (Weeks 1-2)

  1. Inventory current data pipelines
  2. Classify workloads (batch vs stream candidates)
  3. Identify dependencies
  4. Define success criteria

Phase 2: Foundation (Weeks 3-6)

  1. Set up Data Lake Gen2 with zone architecture
  2. Implement Delta Lake format
  3. Configure Synapse workspace
  4. Set up monitoring and governance

Phase 3: Batch Migration (Weeks 7-10)

# Migration script for batch workloads
def migrate_batch_pipeline(source_config, target_config):
    """Migrate existing batch pipeline to hybrid architecture."""

    # 1. Extract from legacy system
    legacy_df = spark.read.jdbc(
        url=source_config['jdbc_url'],
        table=source_config['table_name'],
        properties=source_config['properties']
    )

    # 2. Transform to Delta format
    legacy_df.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("date") \
        .save(target_config['raw_path'])

    # 3. Apply transformations
    transformed_df = spark.read.format("delta").load(target_config['raw_path'])
    transformed_df = apply_business_rules(transformed_df)

    # 4. Write to refined zone
    transformed_df.write \
        .format("delta") \
        .mode("append") \
        .save(target_config['refined_path'])

    return {"status": "migrated", "record_count": transformed_df.count()}

Phase 4: Stream Addition (Weeks 11-14)

# Add streaming capabilities
def add_stream_processing(stream_config):
    """Add stream processing to existing batch pipeline."""

    # 1. Set up stream ingestion
    stream_df = spark.readStream \
        .format("eventhubs") \
        .options(**stream_config['eventhub_config']) \
        .load()

    # 2. Apply same transformations as batch
    stream_transformed = apply_business_rules(stream_df)

    # 3. Write to stream output
    query = stream_transformed.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", stream_config['checkpoint']) \
        .start(stream_config['output_path'])

    return query

Phase 5: Optimization (Weeks 15-16)

  1. Tune batch and stream parameters
  2. Implement query optimization
  3. Set up automated monitoring
  4. Document operational procedures

Complementary Patterns

When to Choose

Choose Hybrid When Choose Lambda When Choose Kappa When
Mixed workload requirements Need separate batch/stream logic All workloads are streaming
Gradual migration needed Mature batch processes exist Starting fresh
Cost optimization important Accuracy over speed Speed over complexity
Organizational flexibility Separate teams for batch/stream Unified streaming team

📚 Additional Resources

Implementation Guides

Reference Architectures

  • IoT Analytics - Real-time device data processing
  • Retail Analytics - Customer behavior and sales analytics
  • Enterprise Data Warehouse - Traditional DW with modern streaming

Last Updated: 2025-01-28 Pattern Status: Active Complexity: Advanced