Skip to content

🗄️ Polyglot Persistence Architecture

Status Complexity Type

Use different database technologies optimized for specific data patterns and access requirements, allowing each component to leverage the best tool for its unique characteristics.


📋 Table of Contents


🎯 Overview

Polyglot Persistence is an architectural approach that uses multiple database technologies within a single application or system, selecting the optimal data store for each specific use case based on data characteristics and access patterns.

Core Principles

  • Right Tool for Right Job: Match database to data characteristics
  • Bounded Contexts: Each domain owns its data store choice
  • Data Synchronization: Keep related data consistent across stores
  • Query Federation: Unified query layer across databases
  • Operational Excellence: Manage multiple technologies effectively

Architecture Benefits

Benefit Description Business Impact
Performance Optimized storage for each data type Better user experience
Scalability Independent scaling per database Cost-effective growth
Flexibility Choose best technology per domain Rapid innovation
Resilience Isolated failure domains Higher availability
Specialization Leverage database-specific features Advanced capabilities

📊 Database Selection Matrix

By Data Characteristics

graph TB
    Start{Data Characteristics}

    Start -->|Structured<br/>ACID Required| Relational[Relational Database<br/>Azure SQL Database]
    Start -->|Semi-Structured<br/>Flexible Schema| Document[Document Store<br/>Cosmos DB]
    Start -->|Relationships<br/>Connected Data| Graph[Graph Database<br/>Cosmos DB Gremlin API]
    Start -->|Time-Series<br/>High Frequency| TimeSeries[Time-Series Database<br/>Data Explorer]
    Start -->|Key-Value<br/>High Throughput| Cache[Cache/Key-Value<br/>Redis Cache]
    Start -->|Search<br/>Full-Text| Search[Search Engine<br/>Azure Cognitive Search]
    Start -->|Large Files<br/>Unstructured| Blob[Object Storage<br/>Blob Storage]
    Start -->|Analytics<br/>Large Scale| Warehouse[Data Warehouse<br/>Synapse Analytics]

    classDef relational fill:#e3f2fd
    classDef document fill:#f3e5f5
    classDef graph fill:#fff3e0
    classDef timeseries fill:#e8f5e9
    classDef cache fill:#fce4ec
    classDef search fill:#f1f8e9

    class Relational relational
    class Document document
    class Graph graph
    class TimeSeries timeseries
    class Cache cache
    class Search search

Decision Matrix

Database Type Best For Not Suitable For Azure Service
Relational ACID transactions, complex joins, structured data Massive scale, unstructured data Azure SQL Database
Document Flexible schema, semi-structured, global distribution Complex joins, transactions Cosmos DB (SQL API)
Graph Connected data, relationships, network analysis Simple lookups, time-series Cosmos DB (Gremlin API)
Time-Series IoT telemetry, metrics, events Transactions, relationships Azure Data Explorer
Key-Value Session state, caching, simple lookups Complex queries, relationships Redis Cache
Search Full-text search, faceted navigation Transactions, source of truth Cognitive Search
Blob Files, media, backups Structured queries, transactions Blob Storage
Analytical Large-scale analytics, BI, reporting OLTP, low latency Synapse Analytics

🏗️ Architecture Components

graph TB
    subgraph "Application Layer"
        API[Application APIs]
        Gateway[API Gateway]
    end

    subgraph "Data Access Layer"
        Router[Data Router]
        Aggregator[Data Aggregator]
    end

    subgraph "Transactional Stores"
        SQL[Azure SQL Database<br/>OLTP Transactions]
        Cosmos[Cosmos DB<br/>Flexible Documents]
    end

    subgraph "Analytical Stores"
        Synapse[Synapse Analytics<br/>Data Warehouse]
        Explorer[Data Explorer<br/>Time-Series Analytics]
    end

    subgraph "Specialized Stores"
        Graph[Cosmos DB Gremlin<br/>Graph Relationships]
        Redis[Redis Cache<br/>Session State]
        Search[Cognitive Search<br/>Full-Text Search]
        Blob[Blob Storage<br/>Files & Media]
    end

    subgraph "Data Synchronization"
        CDC[Change Data Capture]
        EventHub[Event Hub<br/>Data Streaming]
        DataFactory[Data Factory<br/>Batch Sync]
    end

    Gateway --> Router
    Router --> SQL
    Router --> Cosmos
    Router --> Graph
    Router --> Redis

    Aggregator --> Synapse
    Aggregator --> Explorer
    Aggregator --> Search

    SQL --> CDC
    Cosmos --> EventHub
    CDC --> EventHub
    EventHub --> Synapse
    EventHub --> Explorer
    EventHub --> Search

    DataFactory --> Blob
    DataFactory --> Synapse

    classDef app fill:#e3f2fd
    classDef transactional fill:#f3e5f5
    classDef analytical fill:#fff3e0
    classDef specialized fill:#e8f5e9
    classDef sync fill:#fce4ec

    class API,Gateway app
    class SQL,Cosmos transactional
    class Synapse,Explorer analytical
    class Graph,Redis,Search,Blob specialized
    class CDC,EventHub,DataFactory sync

☁️ Azure Service Mapping

Core Database Services

Service Type Use Cases Scaling Model
Azure SQL Database Relational OLTP, transactions, relational data Vertical + Elastic Pools
Cosmos DB Multi-model Global distribution, low latency Horizontal (auto-scale)
Synapse Analytics Data Warehouse Analytics, BI, reporting MPP (massively parallel)
Data Explorer Time-Series IoT, logs, metrics, telemetry Auto-scale clusters
Redis Cache Key-Value Caching, session state Vertical + Clustering
Cognitive Search Search Engine Full-text search, AI enrichment Search units
Blob Storage Object Store Files, backups, data lake Unlimited horizontal

Integration Services

  • Azure Data Factory: Batch data movement and transformation
  • Event Hubs: Real-time event streaming
  • Stream Analytics: Stream processing and routing
  • Logic Apps: Workflow orchestration
  • Azure Functions: Event-driven compute

🔧 Implementation Patterns

Pattern 1: Microservices with Dedicated Data Stores

graph TB
    subgraph "User Service"
        UserAPI[User API]
        UserDB[(Azure SQL<br/>User Data)]
        UserCache[(Redis<br/>User Sessions)]
    end

    subgraph "Product Service"
        ProductAPI[Product API]
        ProductDB[(Cosmos DB<br/>Product Catalog)]
        ProductSearch[(Cognitive Search<br/>Product Index)]
    end

    subgraph "Order Service"
        OrderAPI[Order API]
        OrderDB[(Azure SQL<br/>Order Transactions)]
        OrderEvents[(Event Hub<br/>Order Events)]
    end

    subgraph "Analytics Service"
        AnalyticsAPI[Analytics API]
        AnalyticsDB[(Synapse<br/>Data Warehouse)]
        MetricsDB[(Data Explorer<br/>Metrics)]
    end

    UserAPI --> UserDB
    UserAPI --> UserCache
    ProductAPI --> ProductDB
    ProductAPI --> ProductSearch
    OrderAPI --> OrderDB
    OrderAPI --> OrderEvents

    OrderEvents --> AnalyticsDB
    ProductDB --> AnalyticsDB
    UserDB --> AnalyticsDB
    OrderEvents --> MetricsDB

    classDef service fill:#e3f2fd
    classDef sql fill:#f3e5f5
    classDef nosql fill:#fff3e0
    classDef analytics fill:#e8f5e9

    class UserAPI,ProductAPI,OrderAPI,AnalyticsAPI service
    class UserDB,OrderDB sql
    class ProductDB,UserCache,ProductSearch,OrderEvents nosql
    class AnalyticsDB,MetricsDB analytics

Implementation Example - User Service (Azure SQL):

from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import redis

# Azure SQL for transactional user data
engine = create_engine(
    'mssql+pyodbc://username:password@server.database.windows.net/userdb?driver=ODBC+Driver+17+for+SQL+Server'
)
Base = declarative_base()
Session = sessionmaker(bind=engine)

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, onupdate=datetime.utcnow)

# Redis for user sessions
redis_client = redis.Redis(
    host='mycache.redis.cache.windows.net',
    port=6380,
    password='your-redis-key',
    ssl=True
)

class UserService:
    """User service with polyglot persistence."""

    def __init__(self):
        self.session = Session()
        self.cache = redis_client

    def create_user(self, username: str, email: str) -> User:
        """Create user in Azure SQL."""
        user = User(username=username, email=email)
        self.session.add(user)
        self.session.commit()
        return user

    def get_user(self, user_id: int) -> User:
        """Get user with Redis caching."""
        # Check cache first
        cache_key = f"user:{user_id}"
        cached_user = self.cache.get(cache_key)

        if cached_user:
            return eval(cached_user)  # In production, use proper serialization

        # Fetch from database
        user = self.session.query(User).filter_by(id=user_id).first()

        # Cache for 1 hour
        if user:
            self.cache.setex(cache_key, 3600, str(user.__dict__))

        return user

    def set_user_session(self, user_id: int, session_data: dict):
        """Store session in Redis."""
        session_key = f"session:{user_id}"
        self.cache.setex(session_key, 1800, str(session_data))  # 30 min TTL

Product Service (Cosmos DB + Cognitive Search):

from azure.cosmos import CosmosClient, PartitionKey
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
import uuid

# Cosmos DB for product catalog
cosmos_client = CosmosClient(
    url='https://myaccount.documents.azure.com:443/',
    credential='your-cosmos-key'
)
database = cosmos_client.create_database_if_not_exists(id='products-db')
container = database.create_container_if_not_exists(
    id='products',
    partition_key=PartitionKey(path='/category')
)

# Cognitive Search for product search
search_client = SearchClient(
    endpoint='https://mysearch.search.windows.net',
    index_name='products-index',
    credential=AzureKeyCredential('your-search-key')
)

class ProductService:
    """Product service with Cosmos DB and Cognitive Search."""

    def create_product(self, name: str, category: str, price: float, description: str):
        """Create product in Cosmos DB and index in Search."""
        product = {
            'id': str(uuid.uuid4()),
            'name': name,
            'category': category,
            'price': price,
            'description': description,
            'created_at': datetime.utcnow().isoformat()
        }

        # Store in Cosmos DB
        container.create_item(body=product)

        # Index in Cognitive Search
        search_client.upload_documents(documents=[{
            'id': product['id'],
            'name': product['name'],
            'category': product['category'],
            'price': product['price'],
            'description': product['description']
        }])

        return product

    def get_product(self, product_id: str):
        """Get product from Cosmos DB."""
        return container.read_item(item=product_id, partition_key=product_id)

    def search_products(self, query: str, filters: dict = None):
        """Search products using Cognitive Search."""
        search_results = search_client.search(
            search_text=query,
            filter=filters.get('filter') if filters else None,
            top=20
        )
        return list(search_results)

    def get_products_by_category(self, category: str):
        """Query products by category from Cosmos DB."""
        query = "SELECT * FROM c WHERE c.category = @category"
        parameters = [{"name": "@category", "value": category}]

        items = container.query_items(
            query=query,
            parameters=parameters,
            enable_cross_partition_query=True
        )
        return list(items)

Pattern 2: CQRS with Polyglot Persistence

graph LR
    subgraph "Command Side (Write)"
        Commands[Commands]
        CmdHandler[Command Handler]
        WriteDB[(Azure SQL<br/>Write Model)]
    end

    subgraph "Event Bus"
        Events[(Event Hub<br/>Domain Events)]
    end

    subgraph "Query Side (Read)"
        Queries[Queries]
        ReadDB1[(Cosmos DB<br/>Denormalized Views)]
        ReadDB2[(Redis<br/>Cached Aggregates)]
        ReadDB3[(Search<br/>Search Index)]
    end

    Commands --> CmdHandler
    CmdHandler --> WriteDB
    WriteDB --> Events
    Events --> ReadDB1
    Events --> ReadDB2
    Events --> ReadDB3
    Queries --> ReadDB1
    Queries --> ReadDB2
    Queries --> ReadDB3

    classDef write fill:#e3f2fd
    classDef event fill:#f3e5f5
    classDef read fill:#e8f5e9

    class Commands,CmdHandler,WriteDB write
    class Events event
    class Queries,ReadDB1,ReadDB2,ReadDB3 read

CQRS Implementation Example:

from azure.eventhub import EventHubProducerClient, EventData
from dataclasses import dataclass
from typing import List
import json

@dataclass
class OrderCreatedEvent:
    """Domain event for order creation."""
    order_id: str
    customer_id: str
    total_amount: float
    items: List[dict]
    timestamp: str

class OrderCommandService:
    """Command side - writes to Azure SQL."""

    def __init__(self, sql_session, event_hub_client):
        self.session = sql_session
        self.event_hub = event_hub_client

    def create_order(self, customer_id: str, items: List[dict]) -> str:
        """Create order and publish event."""
        # Write to SQL (source of truth)
        order_id = str(uuid.uuid4())
        total_amount = sum(item['price'] * item['quantity'] for item in items)

        order = Order(
            id=order_id,
            customer_id=customer_id,
            total_amount=total_amount,
            status='pending',
            created_at=datetime.utcnow()
        )
        self.session.add(order)
        self.session.commit()

        # Publish event to Event Hub
        event = OrderCreatedEvent(
            order_id=order_id,
            customer_id=customer_id,
            total_amount=total_amount,
            items=items,
            timestamp=datetime.utcnow().isoformat()
        )

        event_data = EventData(json.dumps(event.__dict__))
        self.event_hub.send_event(event_data)

        return order_id

class OrderQueryService:
    """Query side - reads from optimized stores."""

    def __init__(self, cosmos_container, redis_client, search_client):
        self.cosmos = cosmos_container
        self.redis = redis_client
        self.search = search_client

    def get_order(self, order_id: str):
        """Get order from Cosmos DB (denormalized view)."""
        return self.cosmos.read_item(item=order_id, partition_key=order_id)

    def get_customer_orders(self, customer_id: str):
        """Get customer orders from Redis cache."""
        cache_key = f"customer_orders:{customer_id}"
        cached = self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        # Fetch from Cosmos DB
        query = "SELECT * FROM c WHERE c.customer_id = @customer_id ORDER BY c.created_at DESC"
        items = list(self.cosmos.query_items(
            query=query,
            parameters=[{"name": "@customer_id", "value": customer_id}]
        ))

        # Cache for 5 minutes
        self.redis.setex(cache_key, 300, json.dumps(items))
        return items

    def search_orders(self, search_text: str):
        """Search orders using Cognitive Search."""
        results = self.search.search(search_text=search_text, top=50)
        return list(results)

# Event handler to update read models
class OrderEventHandler:
    """Handle order events and update read models."""

    def __init__(self, cosmos_container, redis_client, search_client):
        self.cosmos = cosmos_container
        self.redis = redis_client
        self.search = search_client

    def handle_order_created(self, event: OrderCreatedEvent):
        """Update all read models."""
        # Update Cosmos DB denormalized view
        order_view = {
            'id': event.order_id,
            'customer_id': event.customer_id,
            'total_amount': event.total_amount,
            'items': event.items,
            'status': 'pending',
            'created_at': event.timestamp
        }
        self.cosmos.create_item(body=order_view)

        # Invalidate customer orders cache
        cache_key = f"customer_orders:{event.customer_id}"
        self.redis.delete(cache_key)

        # Index in search
        self.search.upload_documents(documents=[{
            'id': event.order_id,
            'customer_id': event.customer_id,
            'total_amount': event.total_amount,
            'items_text': ' '.join([item['name'] for item in event.items]),
            'created_at': event.timestamp
        }])

Pattern 3: Graph Database Integration

from gremlin_python.driver import client, serializer

# Cosmos DB Gremlin API for relationship data
gremlin_client = client.Client(
    url='wss://mygraph.gremlin.cosmos.azure.com:443/',
    traversal_source='g',
    username=f'/dbs/graphdb/colls/relationships',
    password='your-cosmos-key',
    message_serializer=serializer.GraphSONSerializersV2d0()
)

class SocialGraphService:
    """Manage social relationships in graph database."""

    def __init__(self, gremlin_client):
        self.client = gremlin_client

    def add_user(self, user_id: str, name: str, properties: dict):
        """Add user vertex to graph."""
        query = f"g.addV('user').property('id', '{user_id}').property('name', '{name}')"
        for key, value in properties.items():
            query += f".property('{key}', '{value}')"

        self.client.submit(query).all().result()

    def follow_user(self, follower_id: str, followee_id: str):
        """Create follow relationship."""
        query = f"""
        g.V('{follower_id}').addE('follows').to(g.V('{followee_id}'))
        """
        self.client.submit(query).all().result()

    def get_followers(self, user_id: str, limit: int = 100):
        """Get user's followers."""
        query = f"g.V('{user_id}').in('follows').limit({limit})"
        results = self.client.submit(query).all().result()
        return results

    def get_mutual_followers(self, user_id1: str, user_id2: str):
        """Find mutual followers between two users."""
        query = f"""
        g.V('{user_id1}').in('follows').as('f1')
         .V('{user_id2}').in('follows').as('f2')
         .where('f1', eq('f2'))
        """
        results = self.client.submit(query).all().result()
        return results

    def recommend_connections(self, user_id: str, limit: int = 10):
        """Recommend connections (friends of friends)."""
        query = f"""
        g.V('{user_id}').out('follows').out('follows')
         .where(neq('{user_id}'))
         .dedup()
         .limit({limit})
        """
        results = self.client.submit(query).all().result()
        return results

🔄 Data Synchronization

Synchronization Patterns

graph TB
    subgraph "Real-time Sync"
        SQL[Azure SQL<br/>Change Tracking]
        CDC[Change Data Capture]
        EventHub1[Event Hub]
        StreamAnalytics[Stream Analytics]
        Target1[Target Databases]
    end

    subgraph "Batch Sync"
        Source[Source Databases]
        DataFactory[Data Factory]
        Target2[Target Databases]
    end

    subgraph "Event-Driven Sync"
        AppEvents[Application Events]
        EventHub2[Event Hub]
        Functions[Azure Functions]
        Target3[Target Databases]
    end

    SQL --> CDC
    CDC --> EventHub1
    EventHub1 --> StreamAnalytics
    StreamAnalytics --> Target1

    Source --> DataFactory
    DataFactory --> Target2

    AppEvents --> EventHub2
    EventHub2 --> Functions
    Functions --> Target3

    classDef realtime fill:#e8f5e9
    classDef batch fill:#fff3e0
    classDef event fill:#f3e5f5

    class SQL,CDC,EventHub1,StreamAnalytics,Target1 realtime
    class Source,DataFactory,Target2 batch
    class AppEvents,EventHub2,Functions,Target3 event

Real-time Sync with Change Data Capture:

from azure.eventhub import EventHubProducerClient, EventData
import pyodbc
import json

class ChangDataCaptureSync:
    """Sync changes from Azure SQL to other databases."""

    def __init__(self, sql_conn_str: str, event_hub_client):
        self.sql_conn = pyodbc.connect(sql_conn_str)
        self.event_hub = event_hub_client

    def enable_cdc_on_table(self, schema: str, table: str):
        """Enable CDC on a table."""
        cursor = self.sql_conn.cursor()

        # Enable CDC on database
        cursor.execute(f"EXEC sys.sp_cdc_enable_db")

        # Enable CDC on table
        cursor.execute(f"""
            EXEC sys.sp_cdc_enable_table
                @source_schema = '{schema}',
                @source_name = '{table}',
                @role_name = NULL
        """)
        cursor.commit()

    def capture_changes(self, schema: str, table: str):
        """Capture and stream changes."""
        cursor = self.sql_conn.cursor()

        # Get CDC instance name
        cdc_table = f"cdc.{schema}_{table}_CT"

        # Query changes
        query = f"""
            SELECT
                __$operation as operation,
                *
            FROM {cdc_table}
            WHERE __$operation IN (1, 2, 4)  -- Delete, Insert, Update
            ORDER BY __$start_lsn
        """

        cursor.execute(query)
        changes = cursor.fetchall()

        # Stream changes to Event Hub
        for change in changes:
            event_data = {
                'table': f"{schema}.{table}",
                'operation': self._get_operation_name(change.operation),
                'data': dict(zip([col[0] for col in cursor.description], change))
            }

            self.event_hub.send_event(EventData(json.dumps(event_data)))

    def _get_operation_name(self, op_code: int) -> str:
        """Convert operation code to name."""
        operations = {1: 'delete', 2: 'insert', 4: 'update'}
        return operations.get(op_code, 'unknown')

💼 Use Cases

1. E-commerce Platform

Database Allocation: - Azure SQL: Orders, payments, inventory (ACID transactions) - Cosmos DB: Product catalog, shopping carts (global distribution) - Redis: Session state, shopping cart cache - Cognitive Search: Product search, recommendations - Synapse: Sales analytics, business intelligence - Blob Storage: Product images, user uploads

2. Social Media Application

Database Allocation: - Azure SQL: User accounts, authentication - Cosmos DB Gremlin: Social graph, relationships - Cosmos DB SQL: Posts, comments (flexible schema) - Data Explorer: User activity analytics, metrics - Redis: Feed cache, session state - Cognitive Search: User and content search - Blob Storage: Media files, attachments

3. IoT Platform

Database Allocation: - Data Explorer: Device telemetry, time-series data - Azure SQL: Device registry, configurations - Cosmos DB: Device state, metadata - Redis: Real-time device status cache - Synapse: Historical analytics, reporting - Blob Storage: Firmware files, logs


✅ Best Practices

1. Database Selection Guidelines

from enum import Enum
from dataclasses import dataclass

class DatabaseType(Enum):
    """Available database types."""
    AZURE_SQL = "azure_sql"
    COSMOS_DB = "cosmos_db"
    REDIS = "redis"
    SYNAPSE = "synapse"
    DATA_EXPLORER = "data_explorer"
    COGNITIVE_SEARCH = "cognitive_search"
    BLOB_STORAGE = "blob_storage"

@dataclass
class DataCharacteristics:
    """Data characteristics for database selection."""
    volume: str  # small, medium, large
    velocity: str  # batch, streaming, real-time
    variety: str  # structured, semi-structured, unstructured
    consistency: str  # eventual, strong
    query_pattern: str  # oltp, olap, search, graph
    latency_requirement_ms: int
    scale_out_required: bool

class DatabaseSelector:
    """Select appropriate database based on characteristics."""

    def select_database(self, characteristics: DataCharacteristics) -> DatabaseType:
        """Recommend database type."""

        # Strong consistency + OLTP -> Azure SQL
        if (characteristics.consistency == "strong" and
            characteristics.query_pattern == "oltp"):
            return DatabaseType.AZURE_SQL

        # Graph queries -> Cosmos DB Gremlin
        if characteristics.query_pattern == "graph":
            return DatabaseType.COSMOS_DB

        # Time-series data -> Data Explorer
        if (characteristics.velocity == "streaming" and
            characteristics.variety == "structured" and
            "time" in characteristics.query_pattern):
            return DatabaseType.DATA_EXPLORER

        # Low latency lookups -> Redis
        if (characteristics.latency_requirement_ms < 10 and
            characteristics.query_pattern == "key-value"):
            return DatabaseType.REDIS

        # Full-text search -> Cognitive Search
        if characteristics.query_pattern == "search":
            return DatabaseType.COGNITIVE_SEARCH

        # Analytics -> Synapse
        if characteristics.query_pattern == "olap":
            return DatabaseType.SYNAPSE

        # Default flexible option -> Cosmos DB
        return DatabaseType.COSMOS_DB

2. Consistency Management

Pattern Description Implementation
Eventual Consistency Accept temporary inconsistency Event-driven updates
Strong Consistency Guarantee immediate consistency Distributed transactions
Read-your-writes User sees their own changes Session-based routing
Monotonic Reads No going back in time Version tracking
Bounded Staleness Maximum staleness guarantee Time-based sync

3. Transaction Patterns

from azure.cosmos import CosmosClient
from azure.storage.blob import BlobServiceClient
import pyodbc

class DistributedTransactionCoordinator:
    """Coordinate transactions across multiple databases."""

    def __init__(self, sql_conn, cosmos_client, blob_client):
        self.sql = sql_conn
        self.cosmos = cosmos_client
        self.blob = blob_client

    def execute_saga(self, order_data: dict):
        """Execute distributed transaction using Saga pattern."""
        compensation_actions = []

        try:
            # Step 1: Create order in Azure SQL
            order_id = self._create_order_sql(order_data)
            compensation_actions.append(lambda: self._delete_order_sql(order_id))

            # Step 2: Reserve inventory in Cosmos DB
            self._reserve_inventory_cosmos(order_data['items'])
            compensation_actions.append(
                lambda: self._release_inventory_cosmos(order_data['items'])
            )

            # Step 3: Store receipt in Blob Storage
            receipt_url = self._store_receipt_blob(order_id, order_data)
            compensation_actions.append(lambda: self._delete_receipt_blob(receipt_url))

            # All steps succeeded
            return {"status": "success", "order_id": order_id}

        except Exception as e:
            # Execute compensation in reverse order
            for compensate in reversed(compensation_actions):
                try:
                    compensate()
                except Exception as comp_error:
                    # Log compensation failure
                    print(f"Compensation failed: {comp_error}")

            return {"status": "failed", "error": str(e)}

    def _create_order_sql(self, order_data: dict) -> str:
        """Create order in Azure SQL."""
        cursor = self.sql.cursor()
        order_id = str(uuid.uuid4())

        cursor.execute("""
            INSERT INTO orders (id, customer_id, total_amount, status)
            VALUES (?, ?, ?, ?)
        """, (order_id, order_data['customer_id'], order_data['total'], 'pending'))

        cursor.commit()
        return order_id

    def _delete_order_sql(self, order_id: str):
        """Compensation: Delete order from SQL."""
        cursor = self.sql.cursor()
        cursor.execute("DELETE FROM orders WHERE id = ?", (order_id,))
        cursor.commit()

    def _reserve_inventory_cosmos(self, items: list):
        """Reserve inventory in Cosmos DB."""
        # Implementation for inventory reservation
        pass

    def _release_inventory_cosmos(self, items: list):
        """Compensation: Release inventory."""
        # Implementation for inventory release
        pass

    def _store_receipt_blob(self, order_id: str, order_data: dict) -> str:
        """Store receipt in Blob Storage."""
        # Implementation for receipt storage
        pass

    def _delete_receipt_blob(self, receipt_url: str):
        """Compensation: Delete receipt."""
        # Implementation for receipt deletion
        pass

Complementary Patterns

Decision Guide

Choose Polyglot When Choose Single Database When
Diverse data types Simple, uniform data
Performance-critical Simplicity preferred
Specialized features needed Team expertise limited
Independent scaling required Operational simplicity critical
Domain-driven architecture Small to medium scale

📚 Additional Resources

Implementation Guides

Reference Architectures

  • Microservices Architecture - Domain-oriented data ownership
  • E-commerce Platform - Multi-database shopping experience
  • IoT Platform - Device data across specialized stores

Last Updated: 2025-01-28 Pattern Status: Active Complexity: Advanced