Skip to content

🔄 Change Feed in Azure Cosmos DB

Real-time

Change Feed provides a persistent log of all changes to Cosmos DB documents, enabling real-time event-driven architectures, data synchronization, and audit trails.


🎯 Overview

Change Feed is a feature that outputs a sorted list of documents that were changed in the order they were modified, enabling reactive applications that respond to data changes.

Key Capabilities

  • Real-time change notifications
  • Guaranteed ordering within partition key
  • Includes inserts and updates (deletes with TTL)
  • Multiple consumers supported
  • Processing from specific point in time

🏗️ Change Feed Modes

Latest Version Mode

from azure.cosmos import CosmosClient, PartitionKey

client = CosmosClient(url="<url>", credential="<credential>")
database = client.get_database_client("ecommerce")
container = database.get_container_client("orders")

# Read changes
for item in container.query_items_change_feed():
    print(f"Changed document: {item['id']}")
    # Process change

All Versions and Deletes Mode

# Track all versions including deletions
for item in container.query_items_change_feed(
    is_start_from_beginning=True,
    all_versions_and_deletes=True
):
    if item.get('_metadata', {}).get('operationType') == 'delete':
        print(f"Deleted: {item['id']}")
    else:
        print(f"Modified: {item['id']}")

🔧 Implementation Patterns

Azure Functions Integration

import azure.functions as func
import logging

def main(documents: func.DocumentList):
    """Process Cosmos DB changes with Azure Functions."""
    if documents:
        logging.info(f'Processing {len(documents)} changed documents')
        for doc in documents:
            # Process each changed document
            process_order_change(doc)

def process_order_change(order):
    """Handle order changes."""
    if order['status'] == 'completed':
        # Send notification
        send_order_confirmation(order)

Change Feed Processor

from azure.cosmos import CosmosClient
from azure.cosmos.partition_key import PartitionKey

# Initialize clients
client = CosmosClient(url="<url>", credential="<credential>")
database = client.get_database_client("ecommerce")
monitored_container = database.get_container_client("orders")
lease_container = database.get_container_client("leases")

# Process changes
def batch_processor(changes):
    for change in changes:
        print(f"Processing change: {change['id']}")
        # Handle change

# Start processor
monitored_container.query_items_change_feed(
    is_start_from_beginning=True,
    max_item_count=100
)


Last Updated: 2025-01-28