Skip to content

Change Data Capture with Databricks

Home | Implementation | Integration | Databricks CDC

Status Complexity

Implement CDC pipelines using Delta Lake Change Data Feed and Structured Streaming.


Overview

Databricks provides native CDC capabilities through:

  • Delta Lake Change Data Feed (CDF): Track row-level changes in Delta tables
  • Structured Streaming: Process changes in near real-time
  • Auto Loader: Incrementally ingest new files

Implementation

Step 1: Enable Change Data Feed

-- Enable CDF on existing table
ALTER TABLE bronze.sales.orders SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

-- Create new table with CDF enabled
CREATE TABLE silver.sales.orders_cleaned (
    order_id BIGINT,
    customer_id STRING,
    order_date DATE,
    amount DECIMAL(18,2),
    status STRING
)
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true);

Step 2: Read Change Data Feed

# Read changes since specific version
changes_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 10) \
    .table("bronze.sales.orders")

# Read changes since timestamp
changes_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "2024-01-01 00:00:00") \
    .table("bronze.sales.orders")

# Display change types
changes_df.select(
    "_change_type",  # insert, update_preimage, update_postimage, delete
    "_commit_version",
    "_commit_timestamp",
    "order_id",
    "amount"
).show()

Step 3: Streaming CDC Pipeline

from pyspark.sql.functions import *

def process_cdc_stream():
    """Process CDC changes as a streaming pipeline."""

    # Read CDC stream
    cdc_stream = spark.readStream.format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingVersion", "latest") \
        .table("bronze.sales.orders")

    # Filter to only process relevant changes
    processed_stream = cdc_stream \
        .filter(col("_change_type").isin(["insert", "update_postimage"])) \
        .withColumn("processed_at", current_timestamp()) \
        .drop("_change_type", "_commit_version", "_commit_timestamp")

    # Write to silver layer
    query = processed_stream.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/checkpoints/orders_cdc") \
        .trigger(processingTime="1 minute") \
        .toTable("silver.sales.orders_stream")

    return query

# Start the streaming job
cdc_query = process_cdc_stream()

Step 4: CDC with MERGE for Upserts

from delta.tables import DeltaTable

def apply_cdc_with_merge(batch_df, batch_id):
    """Apply CDC changes using merge operation."""

    # Get target table
    target_table = DeltaTable.forName(spark, "silver.sales.orders_merged")

    # Separate inserts/updates from deletes
    upserts = batch_df.filter(
        col("_change_type").isin(["insert", "update_postimage"])
    ).drop("_change_type", "_commit_version", "_commit_timestamp")

    deletes = batch_df.filter(col("_change_type") == "delete")

    # Apply upserts
    if upserts.count() > 0:
        target_table.alias("target").merge(
            upserts.alias("source"),
            "target.order_id = source.order_id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()

    # Apply deletes
    if deletes.count() > 0:
        delete_ids = [row.order_id for row in deletes.select("order_id").collect()]
        target_table.delete(col("order_id").isin(delete_ids))

# Run streaming merge
cdc_stream = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", "latest") \
    .table("bronze.sales.orders")

merge_query = cdc_stream.writeStream \
    .foreachBatch(apply_cdc_with_merge) \
    .option("checkpointLocation", "/checkpoints/orders_merge") \
    .trigger(processingTime="5 minutes") \
    .start()

Step 5: Auto Loader for File-Based CDC

def setup_auto_loader_cdc(source_path: str, target_table: str):
    """Set up Auto Loader for incremental file ingestion."""

    stream = spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .option("cloudFiles.schemaLocation", f"/schemas/{target_table}") \
        .option("cloudFiles.inferColumnTypes", "true") \
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
        .load(source_path)

    # Add metadata columns
    enriched = stream \
        .withColumn("_ingested_at", current_timestamp()) \
        .withColumn("_source_file", input_file_name())

    # Write with merge schema for schema evolution
    query = enriched.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"/checkpoints/{target_table}") \
        .option("mergeSchema", "true") \
        .trigger(availableNow=True) \
        .toTable(target_table)

    return query

# Usage
auto_loader_query = setup_auto_loader_cdc(
    source_path="abfss://landing@datalake.dfs.core.windows.net/orders/",
    target_table="bronze.sales.orders_raw"
)

Monitoring CDC Pipelines

def monitor_cdc_health(table_name: str):
    """Monitor CDC pipeline health metrics."""

    # Get table history
    history = spark.sql(f"DESCRIBE HISTORY {table_name}").toPandas()

    # Calculate metrics
    metrics = {
        "total_versions": len(history),
        "last_commit": history.iloc[0]["timestamp"],
        "operations_24h": len(history[history["timestamp"] > (datetime.now() - timedelta(days=1))]),
        "avg_rows_per_commit": history["operationMetrics"].apply(
            lambda x: x.get("numOutputRows", 0) if x else 0
        ).mean()
    }

    return metrics

# Create monitoring dashboard query
spark.sql("""
    SELECT
        table_name,
        MAX(_commit_timestamp) as last_change,
        COUNT(DISTINCT _commit_version) as version_count,
        SUM(CASE WHEN _change_type = 'insert' THEN 1 ELSE 0 END) as inserts,
        SUM(CASE WHEN _change_type LIKE 'update%' THEN 1 ELSE 0 END) as updates,
        SUM(CASE WHEN _change_type = 'delete' THEN 1 ELSE 0 END) as deletes
    FROM cdc_monitoring_view
    GROUP BY table_name
""")


Last Updated: January 2025