Skip to content

Batch Processing Setup


Databricks Delta Lake Status

Overview

This guide covers implementing batch processing pipelines using Azure Databricks and Delta Lake for scheduled data processing, aggregations, and ETL workflows.

Table of Contents


Batch Job Patterns

Incremental Batch Processing

from delta.tables import DeltaTable
from pyspark.sql.functions import *

def incremental_batch_load(source_table, target_table, watermark_column="updated_at"):
    """
    Incrementally load data using watermark
    """
    # Get last processed watermark
    last_watermark = spark.sql(f"""
        SELECT MAX({watermark_column}) as max_timestamp
        FROM {target_table}
    """).collect()[0]["max_timestamp"]

    # Read incremental data
    incremental_data = (spark
        .read
        .table(source_table)
        .filter(col(watermark_column) > lit(last_watermark))
    )

    # Merge into target
    target_delta = DeltaTable.forName(spark, target_table)

    target_delta.alias("target").merge(
        incremental_data.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll(
    ).whenNotMatchedInsertAll(
    ).execute()

# Example usage
incremental_batch_load("bronze.events", "silver.events_validated")

Full Refresh Pattern

def full_refresh_with_swap(source_table, target_table):
    """
    Full refresh using table swap for zero downtime
    """
    temp_table = f"{target_table}_temp"

    # Process full dataset
    full_data = spark.table(source_table)

    # Apply transformations
    processed_data = (full_data
        .filter(col("is_valid") == True)
        .withColumn("processed_timestamp", current_timestamp())
    )

    # Write to temp table
    processed_data.write.mode("overwrite").saveAsTable(temp_table)

    # Swap tables atomically
    spark.sql(f"ALTER TABLE {target_table} RENAME TO {target_table}_old")
    spark.sql(f"ALTER TABLE {temp_table} RENAME TO {target_table}")

    # Drop old table
    spark.sql(f"DROP TABLE IF EXISTS {target_table}_old")

Data Factory Integration

ADF Pipeline Configuration

{
  "name": "DailyBatchProcessing",
  "properties": {
    "activities": [
      {
        "name": "TriggerDatabricksJob",
        "type": "DatabricksNotebook",
        "typeProperties": {
          "notebookPath": "/Shared/BatchProcessing/daily_aggregation",
          "baseParameters": {
            "processing_date": "@{formatDateTime(pipeline().parameters.ProcessingDate, 'yyyy-MM-dd')}"
          }
        },
        "linkedServiceName": {
          "referenceName": "AzureDatabricks",
          "type": "LinkedServiceReference"
        }
      }
    ],
    "parameters": {
      "ProcessingDate": {
        "type": "string",
        "defaultValue": "@utcnow()"
      }
    },
    "annotations": []
  }
}

Delta Lake Batch Operations

Batch Merge Operations

def batch_merge_scd_type2(source_df, target_table, business_keys, scd_columns):
    """
    Implement SCD Type 2 in batch mode
    """
    from delta.tables import DeltaTable

    target = DeltaTable.forName(spark, target_table)

    # Build merge condition
    merge_condition = " AND ".join([
        f"target.{key} = source.{key}" for key in business_keys
    ])
    merge_condition += " AND target.is_current = true"

    # Check for changes
    change_condition = " OR ".join([
        f"target.{col} != source.{col}" for col in scd_columns
    ])

    # Merge with SCD Type 2 logic
    target.alias("target").merge(
        source_df.alias("source"),
        merge_condition
    ).whenMatchedUpdate(
        condition=change_condition,
        set={
            "is_current": "false",
            "end_date": "source.effective_date"
        }
    ).whenNotMatchedInsert(
        values={
            **{key: f"source.{key}" for key in business_keys},
            **{col: f"source.{col}" for col in scd_columns},
            "is_current": "true",
            "start_date": "source.effective_date",
            "end_date": "cast(null as timestamp)"
        }
    ).execute()

Batch Optimization

def optimize_delta_tables(database_name):
    """
    Optimize all Delta tables in a database
    """
    tables = spark.sql(f"SHOW TABLES IN {database_name}").collect()

    for table in tables:
        table_name = f"{database_name}.{table.tableName}"

        print(f"Optimizing {table_name}...")

        # Optimize
        spark.sql(f"OPTIMIZE {table_name}")

        # Z-order by commonly queried columns
        spark.sql(f"OPTIMIZE {table_name} ZORDER BY (date, customer_id)")

        # Vacuum old files (7 days retention)
        spark.sql(f"VACUUM {table_name} RETAIN 168 HOURS")

        # Analyze statistics
        spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS FOR ALL COLUMNS")

Performance Optimization

Partition Pruning

# Configure partition columns for better performance
spark.sql("""
    CREATE TABLE gold.customer_daily_metrics (
        customer_id STRING,
        metric_date DATE,
        total_revenue DECIMAL(10,2),
        event_count BIGINT
    )
    USING DELTA
    PARTITIONED BY (metric_date)
""")

# Query with partition pruning
df = spark.sql("""
    SELECT * FROM gold.customer_daily_metrics
    WHERE metric_date BETWEEN '2025-01-01' AND '2025-01-31'
""")

Caching Strategy

# Cache frequently accessed tables
spark.sql("CACHE TABLE gold.dim_customer")

# Execute batch job
result = spark.sql("""
    SELECT c.*, m.total_revenue
    FROM gold.dim_customer c
    JOIN gold.fact_metrics m ON c.customer_id = m.customer_id
""")

# Uncache after use
spark.sql("UNCACHE TABLE gold.dim_customer")

Monitoring and Alerting

Job Monitoring

from datetime import datetime

def log_batch_job(job_name, status, records_processed):
    """
    Log batch job execution
    """
    log_entry = spark.createDataFrame([{
        "job_name": job_name,
        "status": status,
        "records_processed": records_processed,
        "execution_timestamp": datetime.now(),
        "cluster_id": spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    }])

    log_entry.write.mode("append").saveAsTable("monitoring.batch_job_logs")

# Usage
try:
    # Execute batch job
    result_df = incremental_batch_load(...)
    records = result_df.count()

    log_batch_job("daily_aggregation", "SUCCESS", records)
except Exception as e:
    log_batch_job("daily_aggregation", "FAILED", 0)
    raise


Last Updated: January 2025 Version: 1.0.0 Status: Production Ready