CQRS (Command Query Responsibility Segregation) Pattern¶
Separate read and write data models to optimize performance, scalability, and security by treating queries and commands as fundamentally different operations.
Table of Contents¶
- Overview
- Core Concepts
- Architecture Components
- Azure Implementation
- Implementation Patterns
- Code Examples
- Best Practices
- Common Challenges
- Performance Optimization
- Security Considerations
- Monitoring and Operations
Overview¶
Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates the read and write operations of a data store. Commands handle updates and are responsible for business logic, while queries handle data retrieval and are optimized for specific read scenarios.
Key Principles¶
Separation of Concerns: Commands and queries use different models and potentially different data stores.
Optimized Operations: Read models optimized for queries, write models optimized for updates.
Eventual Consistency: Acceptable lag between write and read models.
Scalability: Independent scaling of read and write workloads.
When to Use CQRS¶
✅ Ideal For:
- Applications with vastly different read and write patterns
- High-performance read requirements
- Complex business logic on the write side
- Multiple read model representations needed
- Independent scaling of reads and writes
- Systems where reads vastly outnumber writes
❌ Not Ideal For:
- Simple CRUD applications
- Applications where reads and writes are similar
- Systems requiring strong consistency
- Teams unfamiliar with eventual consistency
- Small-scale applications
Architecture Diagram¶
graph TB
subgraph "Command Side (Write Model)"
CMD[Commands] --> VAL[Validation]
VAL --> BL[Business Logic]
BL --> WDB[(Write Database<br/>Normalized)]
WDB --> EVT[Events/Changes]
end
subgraph "Synchronization Layer"
EVT --> EB[Event Bus<br/>Event Grid/Event Hubs]
end
subgraph "Query Side (Read Model)"
EB --> SYNC1[Sync Handler 1]
EB --> SYNC2[Sync Handler 2]
EB --> SYNC3[Sync Handler N]
SYNC1 --> RDB1[(Read DB 1<br/>Reporting)]
SYNC2 --> RDB2[(Read DB 2<br/>Search)]
SYNC3 --> RDB3[(Read DB 3<br/>Analytics)]
end
subgraph "API Layer"
API[API Gateway]
API --> CMD
API --> RDB1
API --> RDB2
API --> RDB3
end
style CMD fill:#ffebee
style RDB1 fill:#e8f5e9
style RDB2 fill:#e8f5e9
style RDB3 fill:#e8f5e9 Core Concepts¶
Commands¶
Definition: Requests to perform actions that change system state.
Characteristics:
- Intent-revealing names: PlaceOrder, CancelSubscription
- Validation: Business rule enforcement
- Asynchronous: Often processed asynchronously
- Result: Success/failure, not data
- Side effects: Change state, emit events
Command Structure:
{
"commandId": "cmd_123456",
"commandType": "PlaceOrder",
"timestamp": "2025-01-15T10:30:00Z",
"userId": "user_456",
"aggregateId": "order_789",
"data": {
"customerId": "cust_123",
"items": [
{
"productId": "prod_001",
"quantity": 2,
"price": 29.99
}
],
"shippingAddress": {
"street": "123 Main St",
"city": "Seattle",
"state": "WA",
"zip": "98101"
}
},
"metadata": {
"correlationId": "corr_abc",
"clientId": "web_app",
"ipAddress": "192.168.1.100"
}
}
Queries¶
Definition: Requests to retrieve data without changing state.
Characteristics:
- Idempotent: No side effects
- Optimized: Denormalized for performance
- Multiple models: Different views for different needs
- Fast: Pre-computed aggregations
- Eventually consistent: May lag behind writes
Query Structure:
{
"queryId": "qry_123456",
"queryType": "GetOrderDetails",
"timestamp": "2025-01-15T10:35:00Z",
"userId": "user_456",
"parameters": {
"orderId": "order_789",
"includeHistory": true,
"includeTracking": true
}
}
Write Model¶
Purpose: Enforce business rules and maintain consistency.
Characteristics:
- Normalized schema
- Transactional integrity
- Business logic validation
- Event generation
- Audit trail
Read Models¶
Purpose: Optimized data structures for specific query patterns.
Types:
- Reporting Models: Aggregated data for reports
- Search Models: Optimized for search queries
- UI Models: Denormalized for UI display
- Analytics Models: Dimensional models for analytics
Architecture Components¶
1. Command Handlers¶
Process commands and update write model.
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
import uuid
@dataclass
class PlaceOrderCommand:
order_id: str
customer_id: str
items: List[dict]
shipping_address: dict
user_id: str
class OrderCommandHandler:
def __init__(self, write_db, event_publisher):
self.write_db = write_db
self.event_publisher = event_publisher
async def handle(self, command: PlaceOrderCommand) -> dict:
"""Handle PlaceOrder command."""
# Validate command
await self._validate(command)
# Create order in write database
order = {
"id": command.order_id,
"customer_id": command.customer_id,
"status": "pending",
"items": command.items,
"shipping_address": command.shipping_address,
"created_at": datetime.utcnow().isoformat(),
"created_by": command.user_id,
"total_amount": sum(
item["price"] * item["quantity"]
for item in command.items
)
}
# Save to write database
await self.write_db.create_order(order)
# Publish event for read model synchronization
event = {
"eventId": str(uuid.uuid4()),
"eventType": "OrderPlaced",
"aggregateId": command.order_id,
"timestamp": datetime.utcnow().isoformat(),
"data": order
}
await self.event_publisher.publish("order-events", event)
return {
"orderId": command.order_id,
"status": "accepted"
}
async def _validate(self, command: PlaceOrderCommand):
"""Validate business rules."""
# Validate customer exists
customer = await self.write_db.get_customer(command.customer_id)
if not customer:
raise ValueError(f"Customer {command.customer_id} not found")
# Validate items in stock
for item in command.items:
stock = await self.write_db.get_stock(item["productId"])
if stock < item["quantity"]:
raise ValueError(
f"Insufficient stock for product {item['productId']}"
)
# Validate customer credit
if customer["credit_limit"] < sum(
item["price"] * item["quantity"]
for item in command.items
):
raise ValueError("Customer credit limit exceeded")
2. Query Handlers¶
Retrieve data from read models.
from azure.cosmos import CosmosClient
from typing import List, Optional
class OrderQueryHandler:
def __init__(self, read_db_connection: str):
self.client = CosmosClient.from_connection_string(
read_db_connection
)
self.container = self.client.get_database_client(
"CQRSDB"
).get_container_client("order_read_model")
async def get_order_details(
self,
order_id: str,
include_history: bool = False
) -> dict:
"""Get order details from read model."""
# Query optimized read model
query = """
SELECT
o.id,
o.customer_name,
o.customer_email,
o.status,
o.items,
o.total_amount,
o.shipping_address,
o.tracking_number,
o.placed_at,
o.shipped_at
FROM orders o
WHERE o.id = @orderId
"""
parameters = [{"name": "@orderId", "value": order_id}]
results = list(self.container.query_items(
query=query,
parameters=parameters,
partition_key=order_id
))
if not results:
raise ValueError(f"Order {order_id} not found")
order = results[0]
# Include history if requested
if include_history:
order["history"] = await self._get_order_history(order_id)
return order
async def get_customer_orders(
self,
customer_id: str,
status: Optional[str] = None,
limit: int = 10
) -> List[dict]:
"""Get customer's orders from read model."""
query = """
SELECT
o.id,
o.status,
o.total_amount,
o.placed_at
FROM orders o
WHERE o.customer_id = @customerId
"""
parameters = [{"name": "@customerId", "value": customer_id}]
if status:
query += " AND o.status = @status"
parameters.append({"name": "@status", "value": status})
query += " ORDER BY o.placed_at DESC"
query += f" OFFSET 0 LIMIT {limit}"
results = list(self.container.query_items(
query=query,
parameters=parameters,
partition_key=customer_id
))
return results
async def search_orders(
self,
search_term: str,
filters: Optional[dict] = None
) -> List[dict]:
"""Search orders using search-optimized read model."""
# Query search index (e.g., Azure Cognitive Search)
# This is a simplified example
query = """
SELECT
o.id,
o.customer_name,
o.status,
o.total_amount,
o.placed_at
FROM orders o
WHERE CONTAINS(o.customer_name, @searchTerm)
OR CONTAINS(o.id, @searchTerm)
"""
parameters = [{"name": "@searchTerm", "value": search_term}]
results = list(self.container.query_items(
query=query,
parameters=parameters
))
return results
3. Synchronization Handlers¶
Keep read models synchronized with write model.
from azure.eventhub import EventHubConsumerClient
from azure.cosmos import CosmosClient
import json
class OrderReadModelSynchronizer:
def __init__(
self,
event_hub_connection: str,
read_db_connection: str
):
self.consumer = EventHubConsumerClient.from_connection_string(
event_hub_connection,
consumer_group="order_read_model",
eventhub_name="order-events"
)
self.cosmos = CosmosClient.from_connection_string(
read_db_connection
)
self.container = self.cosmos.get_database_client(
"CQRSDB"
).get_container_client("order_read_model")
async def start(self):
"""Start synchronizing read model."""
await self.consumer.receive(
on_event=self._process_event,
on_error=self._on_error
)
async def _process_event(self, partition_context, event):
"""Process event and update read model."""
event_data = json.loads(event.body_as_str())
event_type = event_data["eventType"]
handlers = {
"OrderPlaced": self._handle_order_placed,
"OrderShipped": self._handle_order_shipped,
"OrderCancelled": self._handle_order_cancelled,
"OrderDelivered": self._handle_order_delivered
}
handler = handlers.get(event_type)
if handler:
await handler(event_data)
# Update checkpoint
await partition_context.update_checkpoint(event)
async def _handle_order_placed(self, event_data: dict):
"""Handle OrderPlaced event - create read model."""
data = event_data["data"]
# Enrich with customer data (denormalization)
customer = await self._get_customer_data(data["customer_id"])
# Create denormalized read model
read_model = {
"id": data["id"],
"order_id": data["id"],
"customer_id": data["customer_id"],
"customer_name": customer["name"],
"customer_email": customer["email"],
"customer_phone": customer["phone"],
"status": data["status"],
"items": data["items"],
"item_count": len(data["items"]),
"total_amount": data["total_amount"],
"shipping_address": data["shipping_address"],
"placed_at": event_data["timestamp"],
"last_updated": event_data["timestamp"]
}
await self.container.upsert_item(body=read_model)
async def _handle_order_shipped(self, event_data: dict):
"""Handle OrderShipped event - update read model."""
data = event_data["data"]
order_id = data["order_id"]
# Read current model
order = await self.container.read_item(
item=order_id,
partition_key=order_id
)
# Update fields
order["status"] = "shipped"
order["shipped_at"] = event_data["timestamp"]
order["tracking_number"] = data["tracking_number"]
order["last_updated"] = event_data["timestamp"]
# Write updated model
await self.container.upsert_item(body=order)
async def _handle_order_cancelled(self, event_data: dict):
"""Handle OrderCancelled event - update read model."""
data = event_data["data"]
order_id = data["order_id"]
# Read current model
order = await self.container.read_item(
item=order_id,
partition_key=order_id
)
# Update fields
order["status"] = "cancelled"
order["cancellation_reason"] = data.get("reason")
order["cancelled_at"] = event_data["timestamp"]
order["last_updated"] = event_data["timestamp"]
# Write updated model
await self.container.upsert_item(body=order)
async def _get_customer_data(self, customer_id: str) -> dict:
"""Fetch customer data for denormalization."""
# Implementation would query customer service/database
pass
async def _on_error(self, partition_context, error):
"""Handle errors in event processing."""
print(f"Error processing events: {error}")
Azure Implementation¶
Azure Service Mapping¶
| Component | Azure Service | Purpose |
|---|---|---|
| Write Database | Azure SQL / Cosmos DB | Transactional write operations |
| Read Databases | Cosmos DB / Azure SQL / Cognitive Search | Optimized read models |
| Event Bus | Event Hubs / Event Grid | Event distribution |
| Command Handlers | Azure Functions / App Service | Process commands |
| Query Handlers | Azure Functions / App Service | Handle queries |
| Sync Handlers | Azure Functions / Stream Analytics | Synchronize read models |
| API Gateway | API Management | Unified API entry point |
Reference Architecture¶
graph TB
subgraph "Client Layer"
WEB[Web App]
MOBILE[Mobile App]
API_CLIENT[API Clients]
end
subgraph "API Gateway"
APIM[API Management]
end
WEB --> APIM
MOBILE --> APIM
API_CLIENT --> APIM
subgraph "Command Side"
APIM --> CMD_FUNC[Command Handler<br/>Azure Functions]
CMD_FUNC --> SQL[(Azure SQL<br/>Write Database)]
SQL --> CDC[Change Data Capture]
end
subgraph "Event Distribution"
CDC --> EH[Event Hubs]
CMD_FUNC -.Publish.-> EH
end
subgraph "Query Side Synchronization"
EH --> SYNC1[Sync Function 1]
EH --> SYNC2[Sync Function 2]
EH --> SA[Stream Analytics]
SYNC1 --> COSMOS[(Cosmos DB<br/>UI Read Model)]
SYNC2 --> SEARCH[(Cognitive Search<br/>Search Index)]
SA --> SYNAPSE[(Synapse<br/>Analytics Model)]
end
subgraph "Query Handlers"
APIM --> QRY_FUNC[Query Handler<br/>Azure Functions]
QRY_FUNC --> COSMOS
QRY_FUNC --> SEARCH
QRY_FUNC --> SYNAPSE
end
subgraph "Monitoring"
CMD_FUNC --> MON[Azure Monitor]
QRY_FUNC --> MON
SYNC1 --> MON
SYNC2 --> MON
end Deployment Template¶
// CQRS Infrastructure - Bicep Template
param location string = resourceGroup().location
param environmentName string = 'dev'
// Write Database - Azure SQL
resource sqlServer 'Microsoft.Sql/servers@2022-05-01-preview' = {
name: 'sql-cqrs-write-${environmentName}'
location: location
properties: {
administratorLogin: 'sqladmin'
administratorLoginPassword: '${uniqueString(resourceGroup().id)}!Aa1'
version: '12.0'
}
}
resource writeDatabase 'Microsoft.Sql/servers/databases@2022-05-01-preview' = {
parent: sqlServer
name: 'OrderWriteDB'
location: location
sku: {
name: 'S0'
tier: 'Standard'
}
}
// Read Database - Cosmos DB
resource cosmosAccount 'Microsoft.DocumentDB/databaseAccounts@2023-04-15' = {
name: 'cosmos-cqrs-read-${environmentName}'
location: location
kind: 'GlobalDocumentDB'
properties: {
databaseAccountOfferType: 'Standard'
consistencyPolicy: {
defaultConsistencyLevel: 'Session'
}
locations: [
{
locationName: location
failoverPriority: 0
}
]
}
}
resource readDatabase 'Microsoft.DocumentDB/databaseAccounts/sqlDatabases@2023-04-15' = {
parent: cosmosAccount
name: 'CQRSDB'
properties: {
resource: {
id: 'CQRSDB'
}
}
}
resource readModelContainer 'Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers@2023-04-15' = {
parent: readDatabase
name: 'order_read_model'
properties: {
resource: {
id: 'order_read_model'
partitionKey: {
paths: ['/id']
kind: 'Hash'
}
}
}
}
// Event Hubs for event distribution
resource eventHubNamespace 'Microsoft.EventHub/namespaces@2022-10-01-preview' = {
name: 'ehns-cqrs-${environmentName}'
location: location
sku: {
name: 'Standard'
tier: 'Standard'
capacity: 1
}
}
resource eventHub 'Microsoft.EventHub/namespaces/eventhubs@2022-10-01-preview' = {
parent: eventHubNamespace
name: 'order-events'
properties: {
messageRetentionInDays: 7
partitionCount: 4
}
}
// Azure Functions for command handlers
resource commandFunctionApp 'Microsoft.Web/sites@2022-09-01' = {
name: 'func-commands-${environmentName}'
location: location
kind: 'functionapp'
properties: {
serverFarmId: commandAppServicePlan.id
siteConfig: {
appSettings: [
{
name: 'FUNCTIONS_WORKER_RUNTIME'
value: 'python'
}
{
name: 'WriteDB__ConnectionString'
value: 'Server=tcp:${sqlServer.properties.fullyQualifiedDomainName},1433;Database=${writeDatabase.name};'
}
{
name: 'EventHub__ConnectionString'
value: listKeys(eventHub.id, eventHub.apiVersion).primaryConnectionString
}
]
}
}
}
resource commandAppServicePlan 'Microsoft.Web/serverfarms@2022-09-01' = {
name: 'asp-commands-${environmentName}'
location: location
sku: {
name: 'Y1'
tier: 'Dynamic'
}
}
// Azure Functions for query handlers
resource queryFunctionApp 'Microsoft.Web/sites@2022-09-01' = {
name: 'func-queries-${environmentName}'
location: location
kind: 'functionapp'
properties: {
serverFarmId: queryAppServicePlan.id
siteConfig: {
appSettings: [
{
name: 'FUNCTIONS_WORKER_RUNTIME'
value: 'python'
}
{
name: 'ReadDB__ConnectionString'
value: cosmosAccount.listConnectionStrings().connectionStrings[0].connectionString
}
]
}
}
}
resource queryAppServicePlan 'Microsoft.Web/serverfarms@2022-09-01' = {
name: 'asp-queries-${environmentName}'
location: location
sku: {
name: 'Y1'
tier: 'Dynamic'
}
}
output writeDbEndpoint string = sqlServer.properties.fullyQualifiedDomainName
output readDbEndpoint string = cosmosAccount.properties.documentEndpoint
output eventHubNamespace string = eventHubNamespace.name
Implementation Patterns¶
Pattern 1: Simple CQRS¶
Use Case: Basic read/write separation with eventual consistency.
# Simple command handler
class SimpleCommandHandler:
def __init__(self, write_db, event_bus):
self.write_db = write_db
self.event_bus = event_bus
async def create_order(self, command: dict) -> str:
# Write to database
order_id = await self.write_db.insert_order(command)
# Publish event
await self.event_bus.publish({
"type": "OrderCreated",
"orderId": order_id,
"data": command
})
return order_id
# Simple query handler
class SimpleQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
async def get_order(self, order_id: str) -> dict:
return await self.read_db.get_order(order_id)
Pattern 2: CQRS with Event Sourcing¶
Use Case: Complete audit trail with optimized read models.
See Also: Event Sourcing Pattern
class EventSourcedCQRS:
def __init__(self, event_store, read_model):
self.event_store = event_store
self.read_model = read_model
async def handle_command(self, command: dict):
# Load aggregate from event store
events = await self.event_store.get_events(
command["aggregateId"]
)
# Reconstruct state
aggregate = self._rebuild_from_events(events)
# Execute command
new_events = aggregate.execute(command)
# Store new events
await self.event_store.append_events(new_events)
# Events automatically sync to read models via event bus
async def query(self, query: dict):
# Query optimized read model
return await self.read_model.query(query)
Pattern 3: Multi-Model CQRS¶
Use Case: Multiple specialized read models for different use cases.
class MultiModelCQRS:
def __init__(
self,
write_db,
ui_read_model,
search_index,
analytics_db,
event_bus
):
self.write_db = write_db
self.ui_read_model = ui_read_model
self.search_index = search_index
self.analytics_db = analytics_db
self.event_bus = event_bus
async def handle_command(self, command: dict):
# Update write model
result = await self.write_db.execute(command)
# Publish event to sync all read models
await self.event_bus.publish({
"type": command["type"],
"data": result
})
return result
async def query_ui(self, query: dict):
"""Query for UI - fast, denormalized."""
return await self.ui_read_model.query(query)
async def search(self, search_term: str):
"""Full-text search."""
return await self.search_index.search(search_term)
async def analytics_query(self, query: dict):
"""Complex analytics query."""
return await self.analytics_db.query(query)
Code Examples¶
Complete CQRS Implementation¶
# main.py - Complete CQRS example
from azure.functions import HttpRequest, HttpResponse
from azure.cosmos import CosmosClient
from azure.eventhub import EventHubProducerClient, EventData
import pyodbc
import json
import os
# Initialize clients
write_db_conn = pyodbc.connect(
os.environ["WriteDB__ConnectionString"]
)
read_db_client = CosmosClient.from_connection_string(
os.environ["ReadDB__ConnectionString"]
)
read_container = read_db_client.get_database_client(
"CQRSDB"
).get_container_client("order_read_model")
event_hub_client = EventHubProducerClient.from_connection_string(
os.environ["EventHub__ConnectionString"],
eventhub_name="order-events"
)
# Command endpoint
async def handle_command(req: HttpRequest) -> HttpResponse:
"""Handle order command (write operation)."""
try:
command = req.get_json()
command_type = command.get("type")
if command_type == "PlaceOrder":
result = await place_order(command)
elif command_type == "CancelOrder":
result = await cancel_order(command)
else:
return HttpResponse(
json.dumps({"error": "Unknown command type"}),
status_code=400
)
return HttpResponse(
json.dumps(result),
mimetype="application/json",
status_code=202
)
except Exception as e:
return HttpResponse(
json.dumps({"error": str(e)}),
status_code=500
)
async def place_order(command: dict) -> dict:
"""Place order - write to write database and publish event."""
cursor = write_db_conn.cursor()
# Insert into write database (normalized)
cursor.execute("""
INSERT INTO Orders (OrderId, CustomerId, Status, CreatedAt)
VALUES (?, ?, ?, GETDATE())
""", (
command["orderId"],
command["customerId"],
"pending"
))
for item in command["items"]:
cursor.execute("""
INSERT INTO OrderItems (OrderId, ProductId, Quantity, Price)
VALUES (?, ?, ?, ?)
""", (
command["orderId"],
item["productId"],
item["quantity"],
item["price"]
))
write_db_conn.commit()
# Publish event for read model sync
event = {
"eventType": "OrderPlaced",
"orderId": command["orderId"],
"data": command
}
event_data = EventData(json.dumps(event))
await event_hub_client.send_batch([event_data])
return {
"orderId": command["orderId"],
"status": "accepted"
}
# Query endpoint
async def handle_query(req: HttpRequest) -> HttpResponse:
"""Handle order query (read operation)."""
try:
order_id = req.params.get("orderId")
if not order_id:
return HttpResponse(
json.dumps({"error": "orderId required"}),
status_code=400
)
# Query read model (denormalized, optimized)
order = await read_container.read_item(
item=order_id,
partition_key=order_id
)
return HttpResponse(
json.dumps(order),
mimetype="application/json"
)
except Exception as e:
return HttpResponse(
json.dumps({"error": str(e)}),
status_code=404
)
# Sync function (Event Hub trigger)
async def sync_read_model(events: List[EventData]):
"""Synchronize read model when events arrive."""
for event in events:
event_data = json.loads(event.body_as_str())
event_type = event_data["eventType"]
if event_type == "OrderPlaced":
await sync_order_placed(event_data)
elif event_type == "OrderCancelled":
await sync_order_cancelled(event_data)
async def sync_order_placed(event_data: dict):
"""Sync OrderPlaced event to read model."""
data = event_data["data"]
# Create denormalized read model
read_model = {
"id": data["orderId"],
"customerId": data["customerId"],
"status": "pending",
"items": data["items"],
"totalAmount": sum(
item["price"] * item["quantity"]
for item in data["items"]
),
"placedAt": event_data.get("timestamp")
}
await read_container.upsert_item(body=read_model)
Best Practices¶
1. Command Design¶
✅ Do:
- Use task-based commands (PlaceOrder, not CreateOrder)
- Validate commands before execution
- Make commands idempotent when possible
- Include all necessary data in commands
- Use correlation IDs for tracking
❌ Don't:
- Return query data from commands
- Execute queries in command handlers
- Allow commands to fail silently
- Use generic command names
2. Query Design¶
✅ Do:
- Optimize read models for specific queries
- Denormalize data in read models
- Cache frequently accessed data
- Use appropriate indexes
- Handle stale data gracefully
❌ Don't:
- Query write database directly
- Normalize read models
- Expect strong consistency
- Ignore read model lag
3. Synchronization¶
✅ Do:
- Use event-driven synchronization
- Handle events idempotently
- Monitor sync lag
- Implement retry logic
- Log sync failures
❌ Don't:
- Use polling for synchronization
- Skip event ordering
- Ignore failed syncs
- Allow unbounded lag
Common Challenges¶
Challenge 1: Eventual Consistency¶
Problem: Read models lag behind writes.
Solutions:
- Embrace Eventual Consistency: Design UI to handle stale data
- Optimistic UI Updates: Update UI immediately, sync in background
- Polling: Check for updates after commands
- WebSockets: Push updates to clients
# Optimistic UI pattern
async def place_order_with_optimistic_ui(command: dict) -> dict:
# Return immediately with optimistic result
optimistic_result = {
"orderId": command["orderId"],
"status": "pending",
"estimatedSyncTime": 2 # seconds
}
# Process command asynchronously
await event_bus.publish({
"type": "PlaceOrder",
"data": command
})
return optimistic_result
Challenge 2: Read Model Rebuilding¶
Problem: Need to rebuild read models from scratch.
Solution: Implement replay capability
class ReadModelRebuilder:
def __init__(self, event_store, read_model):
self.event_store = event_store
self.read_model = read_model
async def rebuild(self):
"""Rebuild read model from event store."""
# Clear existing read model
await self.read_model.clear()
# Replay all events
checkpoint = None
while True:
events = await self.event_store.get_events(
from_checkpoint=checkpoint,
limit=1000
)
if not events:
break
for event in events:
await self.read_model.apply_event(event)
checkpoint = events[-1]["id"]
print(f"Rebuild complete. Processed {checkpoint} events")
Challenge 3: Data Duplication¶
Problem: Same data stored in multiple places.
Solutions:
- Accept Duplication: Storage is cheap, optimize for reads
- Selective Denormalization: Only denormalize what's needed
- Event Sourcing: Single source of truth (events)
Performance Optimization¶
Optimization Strategies¶
1. Read Model Caching¶
from azure.core.credentials import AzureKeyCredential
from azure.data.tables import TableServiceClient
import hashlib
class CachedQueryHandler:
def __init__(self, read_db, cache_connection):
self.read_db = read_db
self.cache = TableServiceClient.from_connection_string(
cache_connection
).get_table_client("query_cache")
self.cache_ttl = 300 # 5 minutes
async def query(self, query: dict) -> dict:
"""Query with caching."""
# Generate cache key
cache_key = hashlib.md5(
json.dumps(query, sort_keys=True).encode()
).hexdigest()
# Try cache
try:
cached = self.cache.get_entity(
partition_key="query",
row_key=cache_key
)
# Check TTL
if self._is_fresh(cached):
return json.loads(cached["data"])
except:
pass
# Execute query
result = await self.read_db.query(query)
# Update cache
self.cache.upsert_entity({
"PartitionKey": "query",
"RowKey": cache_key,
"data": json.dumps(result),
"timestamp": datetime.utcnow().isoformat()
})
return result
def _is_fresh(self, cached_entity: dict) -> bool:
"""Check if cached data is still fresh."""
cached_time = datetime.fromisoformat(cached_entity["timestamp"])
age = (datetime.utcnow() - cached_time).total_seconds()
return age < self.cache_ttl
2. Batch Processing¶
class BatchedSynchronizer:
def __init__(self, read_model, batch_size=100):
self.read_model = read_model
self.batch_size = batch_size
self.batch = []
async def process_event(self, event: dict):
"""Process event with batching."""
self.batch.append(event)
if len(self.batch) >= self.batch_size:
await self._flush_batch()
async def _flush_batch(self):
"""Flush batch to read model."""
if not self.batch:
return
# Process batch in parallel
tasks = [
self.read_model.apply_event(event)
for event in self.batch
]
await asyncio.gather(*tasks)
self.batch.clear()
3. Query Optimization¶
# Pre-compute aggregations in read model
class OptimizedReadModel:
async def update_order_stats(self, event: dict):
"""Update pre-computed statistics."""
data = event["data"]
customer_id = data["customerId"]
# Update customer stats
stats = await self.get_customer_stats(customer_id)
stats["total_orders"] += 1
stats["total_spent"] += data["totalAmount"]
stats["last_order_date"] = event["timestamp"]
await self.save_customer_stats(customer_id, stats)
async def get_customer_summary(self, customer_id: str) -> dict:
"""Get customer summary - instant, no aggregation needed."""
return await self.get_customer_stats(customer_id)
Security Considerations¶
1. Command Authorization¶
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
class SecureCommandHandler:
def __init__(self, write_db, event_bus, key_vault_url):
self.write_db = write_db
self.event_bus = event_bus
self.credential = DefaultAzureCredential()
self.key_vault = SecretClient(
vault_url=key_vault_url,
credential=self.credential
)
async def handle_command(
self,
command: dict,
user_context: dict
) -> dict:
"""Handle command with authorization."""
# Validate user permissions
if not await self._authorize_command(command, user_context):
raise PermissionError(
f"User {user_context['user_id']} not authorized "
f"for command {command['type']}"
)
# Execute command
return await self._execute_command(command)
async def _authorize_command(
self,
command: dict,
user_context: dict
) -> bool:
"""Check if user can execute command."""
command_type = command["type"]
user_id = user_context["user_id"]
user_roles = user_context.get("roles", [])
# Define command permissions
permissions = {
"PlaceOrder": ["customer", "admin"],
"CancelOrder": ["customer", "admin"],
"RefundOrder": ["admin"]
}
required_roles = permissions.get(command_type, [])
return any(role in user_roles for role in required_roles)
2. Data Encryption¶
from cryptography.fernet import Fernet
class EncryptedEventPublisher:
def __init__(self, event_bus, encryption_key: str):
self.event_bus = event_bus
self.cipher = Fernet(encryption_key.encode())
async def publish(self, event: dict):
"""Publish event with encrypted sensitive data."""
# Encrypt sensitive fields
if "creditCard" in event.get("data", {}):
event["data"]["creditCard"] = self.cipher.encrypt(
event["data"]["creditCard"].encode()
).decode()
await self.event_bus.publish(event)
Monitoring and Operations¶
Key Metrics¶
| Metric | Description | Target | Alert Threshold |
|---|---|---|---|
| Command Latency | Time to process commands | < 100ms | > 500ms |
| Query Latency | Time to execute queries | < 50ms | > 200ms |
| Sync Lag | Time between write and read sync | < 2s | > 10s |
| Command Failure Rate | % of failed commands | < 0.1% | > 1% |
| Read Model Freshness | Age of read model data | < 5s | > 30s |
Monitoring Implementation¶
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import metrics
# Configure monitoring
configure_azure_monitor(
connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
)
meter = metrics.get_meter(__name__)
command_counter = meter.create_counter(
"cqrs.commands",
description="Number of commands processed"
)
command_duration = meter.create_histogram(
"cqrs.command_duration",
description="Command processing duration in ms"
)
sync_lag_gauge = meter.create_gauge(
"cqrs.sync_lag",
description="Read model sync lag in seconds"
)
# Instrument handlers
class MonitoredCommandHandler:
def __init__(self, command_handler):
self.command_handler = command_handler
async def handle(self, command: dict) -> dict:
"""Handle command with metrics."""
start_time = time.time()
try:
result = await self.command_handler.handle(command)
# Record success metrics
command_counter.add(1, {
"command_type": command["type"],
"status": "success"
})
duration = (time.time() - start_time) * 1000
command_duration.record(duration, {
"command_type": command["type"]
})
return result
except Exception as e:
# Record error metrics
command_counter.add(1, {
"command_type": command["type"],
"status": "error",
"error_type": type(e).__name__
})
raise
Additional Resources¶
Documentation¶
- Event Sourcing Pattern - Complementary pattern
- Lambda Architecture - Batch and stream processing
- Best Practices - General best practices
Azure Services¶
- Azure Cosmos DB Documentation
- Azure Event Hubs Documentation
- Azure SQL Database Documentation
- Azure Cognitive Search
External Resources¶
Last Updated: 2025-01-28 Pattern Status: Production Ready Complexity Level: Advanced