Skip to content

🔄 HTAP (Hybrid Transactional/Analytical Processing) Patterns

Status Complexity Type

Enable real-time analytics on operational data without the complexity and latency of traditional ETL processes by combining transactional and analytical workloads on a unified platform.


📋 Table of Contents


🎯 Overview

HTAP (Hybrid Transactional/Analytical Processing) enables organizations to run analytics directly on operational data in real-time, eliminating the need for traditional ETL processes and reducing data latency from hours to seconds.

Core Principles

  • Unified Platform: Single system for both OLTP and OLAP workloads
  • Real-time Analytics: No ETL delay for analytical insights
  • Operational Intelligence: Live business metrics and KPIs
  • Zero Data Duplication: Analyze operational data in place
  • Workload Isolation: Prevent analytics from impacting transactions

Architecture Benefits

Benefit Description Business Impact
Real-time Insights Analytics on live data Faster decision making
Simplified Architecture No separate data warehouse Reduced complexity
Cost Efficiency No data duplication Lower storage costs
Operational Intelligence Live business metrics Improved operations
Reduced Latency Eliminate ETL delays Fresh data always

Traditional vs HTAP Architecture

graph TB
    subgraph "Traditional Architecture"
        OLTP1[OLTP Database]
        ETL[ETL Process<br/>Batch Nightly]
        DW[Data Warehouse]
        BI1[BI Tools]

        OLTP1 -->|6-24 hrs delay| ETL
        ETL --> DW
        DW --> BI1
    end

    subgraph "HTAP Architecture"
        HTAP[HTAP Database<br/>Cosmos DB + Synapse Link]
        BI2[BI Tools]
        Apps[Applications]

        Apps -->|Real-time| HTAP
        HTAP -->|No delay| BI2
    end

    classDef traditional fill:#ffebee
    classDef htap fill:#e8f5e9

    class OLTP1,ETL,DW,BI1 traditional
    class HTAP,BI2,Apps htap

🏗️ Architecture Components

graph TB
    subgraph "Application Layer"
        WebApp[Web Applications]
        MobileApp[Mobile Apps]
        API[REST APIs]
    end

    subgraph "HTAP Platform"
        subgraph "Transactional Store"
            CosmosOLTP[Cosmos DB<br/>Transactional Storage<br/>Row-based]
        end

        subgraph "Analytical Store"
            CosmosOLAP[Cosmos DB<br/>Analytical Storage<br/>Column-based]
        end

        SynapseLink[Azure Synapse Link<br/>Auto-Sync]
    end

    subgraph "Analytics Layer"
        SparkPools[Synapse Spark Pools<br/>Complex Analytics]
        ServerlessSQL[Synapse Serverless SQL<br/>Ad-hoc Queries]
        PowerBI[Power BI<br/>Dashboards]
    end

    subgraph "ML & AI Layer"
        AzureML[Azure Machine Learning]
        CogServices[Cognitive Services]
    end

    WebApp --> CosmosOLTP
    MobileApp --> CosmosOLTP
    API --> CosmosOLTP

    CosmosOLTP <-->|Auto-Sync<br/>No ETL| SynapseLink
    SynapseLink <--> CosmosOLAP

    CosmosOLAP --> SparkPools
    CosmosOLAP --> ServerlessSQL
    SparkPools --> PowerBI
    ServerlessSQL --> PowerBI

    CosmosOLAP --> AzureML
    SparkPools --> AzureML

    classDef app fill:#e3f2fd
    classDef transactional fill:#f3e5f5
    classDef analytical fill:#fff3e0
    classDef analytics fill:#e8f5e9
    classDef ml fill:#fce4ec

    class WebApp,MobileApp,API app
    class CosmosOLTP transactional
    class CosmosOLAP,SynapseLink analytical
    class SparkPools,ServerlessSQL,PowerBI analytics
    class AzureML,CogServices ml

Key Components

1. Transactional Store (Row-based)

  • Optimized for OLTP operations
  • High-throughput writes
  • Low-latency reads
  • ACID transactions
  • Global distribution

2. Analytical Store (Column-based)

  • Optimized for OLAP queries
  • Columnar storage format
  • Automatic schema inference
  • Cost-effective storage
  • No impact on transactional workload

3. Auto-Synchronization

  • Near real-time sync (typically < 2 minutes)
  • No ETL coding required
  • No impact on transactional RUs
  • Schema evolution support
  • TTL support for data lifecycle

☁️ Azure Service Mapping

Primary HTAP Services

Service Component Purpose Key Features
Cosmos DB Transactional Store OLTP operations Global distribution, multi-model
Synapse Link Integration Layer Auto-sync No ETL, near real-time
Synapse Spark Analytics Engine Complex analytics Distributed processing
Synapse Serverless SQL Query Engine Ad-hoc queries Pay-per-query
Power BI Visualization Dashboards Real-time reports
Azure ML ML Platform Predictive analytics Integrated ML

Supporting Services

  • Azure Monitor: Performance monitoring
  • Log Analytics: Diagnostics and troubleshooting
  • Key Vault: Secrets management
  • Azure Purview: Data governance
  • Azure DevOps: CI/CD pipelines

🔧 Implementation Patterns

Pattern 1: Real-time Operational Analytics

graph LR
    subgraph "Operational Layer"
        Apps[Applications]
        Cosmos[(Cosmos DB<br/>Transactional)]
    end

    subgraph "Analytics Layer"
        CosmosAnalytical[(Cosmos DB<br/>Analytical Store)]
        Synapse[Synapse Analytics]
        Dashboard[Real-time Dashboard]
    end

    Apps -->|Write| Cosmos
    Cosmos -.->|Auto-sync| CosmosAnalytical
    CosmosAnalytical --> Synapse
    Synapse --> Dashboard

    classDef operational fill:#e3f2fd
    classDef analytical fill:#e8f5e9

    class Apps,Cosmos operational
    class CosmosAnalytical,Synapse,Dashboard analytical

Implementation Steps:

  1. Enable Synapse Link on Cosmos DB account
  2. Enable analytical store on containers
  3. Create Synapse workspace
  4. Create linked service to Cosmos DB
  5. Query analytical store with Synapse

Step 1: Enable Synapse Link on Cosmos DB:

from azure.cosmos import CosmosClient
from azure.mgmt.cosmosdb import CosmosDBManagementClient
from azure.identity import DefaultAzureCredential

# Create Cosmos DB account with Synapse Link enabled
credential = DefaultAzureCredential()
cosmos_management_client = CosmosDBManagementClient(
    credential=credential,
    subscription_id="your-subscription-id"
)

# Enable Synapse Link on account
cosmos_account_params = {
    "location": "East US",
    "enable_analytical_storage": True,
    "locations": [{
        "location_name": "East US",
        "failover_priority": 0
    }]
}

cosmos_management_client.database_accounts.begin_create_or_update(
    resource_group_name="your-rg",
    account_name="your-cosmos-account",
    create_update_parameters=cosmos_account_params
).result()

Step 2: Enable Analytical Store on Container:

from azure.cosmos import CosmosClient, PartitionKey

cosmos_client = CosmosClient(
    url="https://your-account.documents.azure.com:443/",
    credential="your-cosmos-key"
)

database = cosmos_client.create_database_if_not_exists(id="retail-db")

# Create container with analytical store enabled
container = database.create_container_if_not_exists(
    id="transactions",
    partition_key=PartitionKey(path="/customerId"),
    analytical_storage_ttl=-1  # -1 for infinite retention, or specify days
)

Step 3: Query Analytical Store with Synapse Spark:

# PySpark code to query Cosmos DB analytical store
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CosmosDBAnalytics") \
    .config("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .getOrCreate()

# Read from analytical store
df = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .option("spark.cosmos.database", "retail-db") \
    .option("spark.cosmos.container", "transactions") \
    .load()

# Real-time analytics query
result = df.filter(df.transactionDate >= "2025-01-01") \
    .groupBy("customerId", "productCategory") \
    .agg(
        {"transactionAmount": "sum", "transactionId": "count"}
    ) \
    .withColumnRenamed("sum(transactionAmount)", "totalSpent") \
    .withColumnRenamed("count(transactionId)", "transactionCount")

# Display results
result.show()

# Write to Delta Lake for further analysis
result.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/mnt/analytics/customer-spending")

Step 4: Query with Synapse Serverless SQL:

-- Query Cosmos DB analytical store with T-SQL
USE master;
GO

-- Create database
CREATE DATABASE CosmosDBAnalytics;
GO

USE CosmosDBAnalytics;
GO

-- Create external data source
CREATE EXTERNAL DATA SOURCE CosmosDBAnalyticalStore
WITH (
    LOCATION = 'https://your-account.documents.azure.com:443/',
    CREDENTIAL = CosmosDBCredential
);

-- Query analytical store directly
SELECT
    customerId,
    productCategory,
    SUM(transactionAmount) AS totalSpent,
    COUNT(*) AS transactionCount,
    AVG(transactionAmount) AS avgTransactionAmount
FROM
    OPENROWSET(
        PROVIDER = 'CosmosDB',
        CONNECTION = 'Account=your-account;Database=retail-db',
        OBJECT = 'transactions'
    )
WITH (
    customerId VARCHAR(100),
    productCategory VARCHAR(50),
    transactionAmount DECIMAL(10,2),
    transactionDate DATE
) AS transactions
WHERE
    transactionDate >= '2025-01-01'
GROUP BY
    customerId,
    productCategory
ORDER BY
    totalSpent DESC;

Pattern 2: Real-time Customer 360

graph TB
    subgraph "Data Sources"
        Web[Web Interactions]
        Mobile[Mobile App]
        Store[In-Store POS]
        Support[Support Tickets]
    end

    subgraph "Transactional Layer"
        Cosmos[Cosmos DB<br/>Customer Data]
    end

    subgraph "Analytics Layer"
        Analytical[Analytical Store]
        Spark[Synapse Spark<br/>360 View Builder]
    end

    subgraph "Consumption"
        Dashboard[Customer 360<br/>Dashboard]
        ML[ML Models<br/>Churn Prediction]
        Personalization[Real-time<br/>Personalization]
    end

    Web --> Cosmos
    Mobile --> Cosmos
    Store --> Cosmos
    Support --> Cosmos

    Cosmos -.->|Auto-sync| Analytical
    Analytical --> Spark
    Spark --> Dashboard
    Spark --> ML
    ML --> Personalization

    classDef source fill:#e3f2fd
    classDef transactional fill:#f3e5f5
    classDef analytical fill:#fff3e0
    classDef consumption fill:#e8f5e9

    class Web,Mobile,Store,Support source
    class Cosmos transactional
    class Analytical,Spark analytical
    class Dashboard,ML,Personalization consumption

Customer 360 Implementation:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg, max, min, datediff, current_date
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("Customer360").getOrCreate()

# Read customer interactions from analytical store
interactions_df = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .option("spark.cosmos.database", "crm-db") \
    .option("spark.cosmos.container", "customer-interactions") \
    .load()

# Build Customer 360 view
customer_360 = interactions_df.groupBy("customerId").agg(
    # Purchase metrics
    sum(col("transactionAmount")).alias("lifetimeValue"),
    count(col("transactionId")).alias("totalTransactions"),
    avg(col("transactionAmount")).alias("avgTransactionAmount"),
    max(col("transactionDate")).alias("lastPurchaseDate"),
    min(col("transactionDate")).alias("firstPurchaseDate"),

    # Engagement metrics
    count(col("webVisitId")).alias("totalWebVisits"),
    count(col("supportTicketId")).alias("totalSupportTickets"),
    count(col("mobileAppSession")).alias("totalMobileAppSessions"),

    # Channel preferences
    sum(col("webPurchases")).alias("webPurchases"),
    sum(col("mobilePurchases")).alias("mobilePurchases"),
    sum(col("storePurchases")).alias("storePurchases")
)

# Calculate derived metrics
customer_360 = customer_360.withColumn(
    "daysSinceLastPurchase",
    datediff(current_date(), col("lastPurchaseDate"))
).withColumn(
    "customerTenureDays",
    datediff(current_date(), col("firstPurchaseDate"))
).withColumn(
    "purchaseFrequency",
    col("totalTransactions") / (col("customerTenureDays") / 30.0)
)

# Segment customers
customer_360 = customer_360.withColumn(
    "customerSegment",
    when(col("lifetimeValue") > 10000, "VIP")
    .when(col("lifetimeValue") > 5000, "High Value")
    .when(col("lifetimeValue") > 1000, "Regular")
    .otherwise("New")
)

# Write to Delta Lake for consumption
customer_360.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/mnt/analytics/customer-360")

# Register as table for SQL queries
customer_360.createOrReplaceTempView("customer_360_view")

# Enable real-time Power BI dashboard
spark.sql("""
    CREATE OR REPLACE TABLE customer_360_bi
    AS SELECT * FROM customer_360_view
""")

Pattern 3: Real-time Fraud Detection

from pyspark.sql.functions import window, sum, count, avg, stddev
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Read streaming transactions from analytical store
streaming_transactions = spark.readStream \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .option("spark.cosmos.database", "payments-db") \
    .option("spark.cosmos.container", "transactions") \
    .load()

# Feature engineering for fraud detection
fraud_features = streaming_transactions \
    .withWatermark("transactionTimestamp", "10 minutes") \
    .groupBy(
        window("transactionTimestamp", "5 minutes"),
        "customerId"
    ) \
    .agg(
        count("*").alias("transactionCount"),
        sum("amount").alias("totalAmount"),
        avg("amount").alias("avgAmount"),
        stddev("amount").alias("stdDevAmount"),
        count(when(col("merchantCategory") == "foreign", 1)).alias("foreignTransactions")
    )

# Apply fraud detection model
assembler = VectorAssembler(
    inputCols=["transactionCount", "totalAmount", "avgAmount", "stdDevAmount", "foreignTransactions"],
    outputCol="features"
)

# Load pre-trained model
from pyspark.ml import PipelineModel
fraud_model = PipelineModel.load("/mnt/models/fraud-detection")

# Apply model to streaming data
fraud_predictions = fraud_model.transform(fraud_features)

# Alert on high-risk transactions
high_risk_transactions = fraud_predictions.filter(col("prediction") == 1)

# Write alerts to Cosmos DB for immediate action
query = high_risk_transactions.writeStream \
    .format("cosmos.oltp") \
    .option("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .option("spark.cosmos.database", "alerts-db") \
    .option("spark.cosmos.container", "fraud-alerts") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/fraud-detection") \
    .start()

query.awaitTermination()

💼 Use Cases

1. Real-time Inventory Management

Scenario: E-commerce platform needs real-time inventory visibility across channels

Implementation: - Write inventory updates to Cosmos DB transactional store - Query analytical store for real-time inventory analytics - Power BI dashboard shows live inventory levels - ML models predict stockouts

Benefits: - No inventory data lag - Prevent stockouts and overselling - Optimize inventory levels - Real-time reorder triggers

2. Live Financial Dashboards

Scenario: Banking application requires real-time financial reporting

Implementation: - Transactions written to Cosmos DB - Analytical store enables instant aggregations - Serverless SQL provides ad-hoc queries - Power BI shows live financial metrics

Benefits: - Instant financial visibility - Real-time risk assessment - Regulatory compliance - No batch processing delays

3. Real-time Customer Personalization

Scenario: Retail application personalizes experience based on live behavior

Implementation: - Customer interactions stored in Cosmos DB - Analytical store powers ML models - Real-time recommendations - Live customer segmentation

Benefits: - Immediate personalization - Better customer experience - Increased conversion rates - Real-time insights


⚡ Performance Optimization

1. Partition Key Design

# Optimize partition key for both OLTP and OLAP workloads
from azure.cosmos import PartitionKey

# Good partition key: High cardinality, even distribution
container = database.create_container_if_not_exists(
    id="transactions",
    partition_key=PartitionKey(path="/customerId"),
    analytical_storage_ttl=-1
)

# Avoid: Low cardinality partition keys
# Bad example: /country (only ~200 values)
# Bad example: /date (creates hot partitions)

2. Analytical Store TTL Configuration

# Configure TTL for cost optimization
# Option 1: Infinite retention
analytical_storage_ttl = -1

# Option 2: Time-based retention (e.g., 365 days)
analytical_storage_ttl = 365 * 24 * 60 * 60  # in seconds

# Option 3: Disable analytical store
analytical_storage_ttl = 0

container = database.create_container_if_not_exists(
    id="transactions",
    partition_key=PartitionKey(path="/customerId"),
    analytical_storage_ttl=analytical_storage_ttl
)

3. Query Optimization

-- Use partition key in WHERE clause for better performance
SELECT
    customerId,
    SUM(amount) AS totalSpent
FROM
    OPENROWSET(
        PROVIDER = 'CosmosDB',
        CONNECTION = 'Account=your-account;Database=retail-db',
        OBJECT = 'transactions'
    )
WITH (
    customerId VARCHAR(100),
    amount DECIMAL(10,2),
    transactionDate DATE
) AS t
WHERE
    customerId IN ('CUST001', 'CUST002', 'CUST003')  -- Partition key filter
    AND transactionDate >= '2025-01-01'
GROUP BY
    customerId;

4. Spark Performance Tuning

# Configure Spark for optimal analytical store queries
spark.conf.set("spark.cosmos.read.inferSchema.enabled", "false")
spark.conf.set("spark.cosmos.read.customSchema",
               "customerId STRING, amount DECIMAL(10,2), transactionDate DATE")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Use partition pruning
df = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "CosmosDBLinkedService") \
    .option("spark.cosmos.database", "retail-db") \
    .option("spark.cosmos.container", "transactions") \
    .option("spark.cosmos.read.partitioning.strategy", "Restrictive") \
    .load()

✅ Best Practices

1. Data Modeling

Practice Description Impact
Denormalization Embed related data in documents Fewer joins, better performance
Partition Key High cardinality, even distribution Balanced workload
Schema Design Consistent schema for analytics Easier querying
TTL Management Lifecycle policy for old data Cost optimization

2. Workload Isolation

# Separate Synapse workspaces for different workload types
class HTAPWorkloadManager:
    """Manage HTAP workloads with isolation."""

    def __init__(self):
        self.spark_pools = {
            "interactive": "small-pool",  # Ad-hoc queries
            "batch": "large-pool",        # Large analytics jobs
            "ml": "gpu-pool"              # ML training
        }

    def submit_query(self, query_type: str, query: str):
        """Route query to appropriate Spark pool."""
        pool = self.spark_pools.get(query_type, "small-pool")

        spark = SparkSession.builder \
            .appName(f"{query_type}-query") \
            .config("spark.synapse.pool", pool) \
            .getOrCreate()

        return spark.sql(query)

3. Monitoring and Alerting

from azure.monitor.query import LogsQueryClient, MetricsQueryClient
from datetime import timedelta

class HTAPMonitor:
    """Monitor HTAP performance and health."""

    def __init__(self, workspace_id: str, credentials):
        self.logs_client = LogsQueryClient(credentials)
        self.metrics_client = MetricsQueryClient(credentials)
        self.workspace_id = workspace_id

    def check_sync_lag(self):
        """Monitor Synapse Link sync lag."""
        query = """
        AzureDiagnostics
        | where ResourceProvider == "MICROSOFT.DOCUMENTDB"
        | where Category == "DataPlaneRequests"
        | where OperationName == "AnalyticalStorageReplication"
        | summarize avg(DurationMs) by bin(TimeGenerated, 5m)
        """

        response = self.logs_client.query_workspace(
            workspace_id=self.workspace_id,
            query=query,
            timespan=timedelta(hours=1)
        )

        return response.tables[0].rows

    def check_query_performance(self):
        """Monitor analytical query performance."""
        query = """
        SynapseSqlPoolExecRequests
        | where Status == "Completed"
        | summarize
            avg(TotalElapsedTime),
            max(TotalElapsedTime),
            count()
        by bin(SubmitTime, 15m)
        """

        response = self.logs_client.query_workspace(
            workspace_id=self.workspace_id,
            query=query,
            timespan=timedelta(hours=1)
        )

        return response.tables[0].rows

💰 Cost Optimization

1. Analytical Store Pricing Model

Component Pricing Optimization Strategy
Storage $0.02/GB/month Use TTL to limit retention
Sync No additional cost Enabled by default
Queries Based on Synapse pricing Use Serverless SQL for ad-hoc
Transactional Store RU-based No impact from analytics

2. Cost Optimization Strategies

# Strategy 1: Implement data lifecycle management
def optimize_analytical_storage(container):
    """Configure TTL for cost-effective analytical storage."""

    # Keep recent data in analytical store
    recent_data_ttl = 90 * 24 * 60 * 60  # 90 days

    container.replace_container(
        partition_key=PartitionKey(path="/customerId"),
        analytical_storage_ttl=recent_data_ttl
    )

# Strategy 2: Use Serverless SQL for ad-hoc queries
# Pays only for data processed, not for idle time

# Strategy 3: Schedule large analytics jobs during off-peak
from datetime import datetime

def should_run_analytics_job():
    """Run expensive jobs during off-peak hours."""
    current_hour = datetime.now().hour
    return 2 <= current_hour <= 6  # 2 AM - 6 AM

# Strategy 4: Use result caching
spark.conf.set("spark.sql.cache.enabled", "true")

# Cache frequently accessed data
customer_360_df = spark.read.format("delta").load("/mnt/analytics/customer-360")
customer_360_df.cache()

Complementary Patterns

When to Choose HTAP

Choose HTAP When Choose Traditional ETL When
Need real-time analytics Batch analytics sufficient
Operational intelligence required Complex transformations needed
Simple transformations Multiple source systems
Cost-sensitive Mature ETL processes exist
Unified platform preferred Specialized tools required

📚 Additional Resources

Documentation

Reference Architectures


Last Updated: 2025-01-28 Pattern Status: Active Complexity: Advanced