Change Data Capture with Azure Data Factory¶
Home | Implementation | Integration | Data Factory CDC
Implement incremental data loading using Azure Data Factory's CDC capabilities.
Overview¶
Azure Data Factory provides native CDC support for capturing and processing data changes from various sources.
Supported Sources¶
- SQL Server (with CT/CDC enabled)
- Azure SQL Database
- Azure Cosmos DB (Change Feed)
- SAP systems
- Oracle (with LogMiner)
Implementation¶
Step 1: Enable CDC on Source¶
-- Enable CDC on Azure SQL Database
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on specific table
EXEC sys.sp_cdc_enable_table
@source_schema = N'Sales',
@source_name = N'Orders',
@role_name = NULL,
@supports_net_changes = 1;
-- Verify CDC is enabled
SELECT name, is_cdc_enabled FROM sys.databases WHERE name = DB_NAME();
Step 2: Create CDC Pipeline in Data Factory¶
{
"name": "CDC_Orders_Pipeline",
"properties": {
"activities": [
{
"name": "GetChangeData",
"type": "Copy",
"inputs": [
{
"referenceName": "SqlServerCDCSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "DeltaLakeSink",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "@concat('SELECT * FROM cdc.fn_cdc_get_net_changes_Sales_Orders(@from_lsn, @to_lsn, ''all'')')",
"type": "Expression"
}
},
"sink": {
"type": "DeltaSink",
"writeBatchSize": 10000,
"deltaTableName": "orders_cdc"
},
"enableStaging": false
}
}
],
"parameters": {
"from_lsn": { "type": "string" },
"to_lsn": { "type": "string" }
}
}
}
Step 3: Watermark Management¶
{
"name": "CDC_With_Watermark",
"properties": {
"activities": [
{
"name": "LookupLastWatermark",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT MAX(lsn) as last_lsn FROM watermark_table WHERE table_name = 'Orders'"
},
"dataset": {
"referenceName": "WatermarkDataset",
"type": "DatasetReference"
}
}
},
{
"name": "GetCurrentLSN",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": "SELECT sys.fn_cdc_get_max_lsn() as current_lsn"
}
}
},
{
"name": "CopyChanges",
"type": "Copy",
"dependsOn": [
{ "activity": "LookupLastWatermark", "dependencyConditions": ["Succeeded"] },
{ "activity": "GetCurrentLSN", "dependencyConditions": ["Succeeded"] }
],
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": {
"value": "@concat('DECLARE @from_lsn binary(10) = 0x', activity('LookupLastWatermark').output.firstRow.last_lsn, '; DECLARE @to_lsn binary(10) = 0x', activity('GetCurrentLSN').output.firstRow.current_lsn, '; SELECT * FROM cdc.fn_cdc_get_net_changes_Sales_Orders(@from_lsn, @to_lsn, ''all'')')",
"type": "Expression"
}
}
}
},
{
"name": "UpdateWatermark",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{ "activity": "CopyChanges", "dependencyConditions": ["Succeeded"] }
],
"typeProperties": {
"storedProcedureName": "usp_UpdateWatermark",
"storedProcedureParameters": {
"table_name": { "value": "Orders" },
"lsn": { "value": { "value": "@activity('GetCurrentLSN').output.firstRow.current_lsn", "type": "Expression" } }
}
}
}
]
}
}
Step 4: Apply Changes to Delta Lake¶
# Databricks notebook to merge CDC changes
from delta.tables import DeltaTable
def apply_cdc_changes(cdc_path: str, target_table: str):
"""Apply CDC changes using MERGE."""
# Read CDC data
cdc_df = spark.read.format("delta").load(cdc_path)
# Get target table
target = DeltaTable.forName(spark, target_table)
# Merge changes
target.alias("target").merge(
cdc_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.__$operation = 4", # Update
set={
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"amount": "source.amount",
"updated_at": "current_timestamp()"
}
).whenMatchedDelete(
condition="source.__$operation = 1" # Delete
).whenNotMatchedInsert(
condition="source.__$operation = 2", # Insert
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"amount": "source.amount",
"created_at": "current_timestamp()",
"updated_at": "current_timestamp()"
}
).execute()
Cosmos DB Change Feed Integration¶
{
"name": "CosmosDB_ChangeFeed_Pipeline",
"properties": {
"activities": [
{
"name": "CopyChangeFeed",
"type": "Copy",
"typeProperties": {
"source": {
"type": "CosmosDbSqlApiSource",
"preferredRegions": ["East US"],
"detectDatetime": true
},
"sink": {
"type": "DeltaSink"
}
}
}
]
}
}
Monitoring and Alerting¶
{
"name": "CDC_Monitor",
"properties": {
"activities": [
{
"name": "CheckCDCLatency",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "SqlServerSource",
"sqlReaderQuery": "SELECT DATEDIFF(MINUTE, MAX(tran_end_time), GETDATE()) as latency_minutes FROM cdc.lsn_time_mapping"
}
}
},
{
"name": "AlertOnHighLatency",
"type": "WebActivity",
"dependsOn": [
{ "activity": "CheckCDCLatency", "dependencyConditions": ["Succeeded"] }
],
"typeProperties": {
"url": "https://your-logic-app-url",
"method": "POST",
"body": {
"value": "@json(concat('{\"latency\":', activity('CheckCDCLatency').output.firstRow.latency_minutes, '}'))",
"type": "Expression"
}
},
"condition": "@greater(activity('CheckCDCLatency').output.firstRow.latency_minutes, 30)"
}
]
}
}
Related Documentation¶
- Databricks CDC
- Synapse CDC
- Data Factory Best Practices
Last Updated: January 2025