Skip to content

Delta Table Optimization in Azure Synapse Analytics

🏠 Home > 💻 Code Examples > 🌞️ Delta Lake > 📄 Table Optimization

This guide provides detailed examples for optimizing Delta Lake tables in Azure Synapse Analytics to improve query performance and reduce costs.

Introduction to Delta Table Optimization

Delta Lake tables can accumulate many small files over time, especially with streaming or incremental data loads. Optimization techniques help maintain performance by compacting small files and optimizing data layout.

Prerequisites

  • Azure Synapse Analytics workspace
  • Storage account with a Delta Lake table
  • Appropriate permissions to run Spark jobs

Core Optimization Commands

OPTIMIZE Command

The OPTIMIZE command compacts small files into larger ones for better read performance:

# Import required libraries
from pyspark.sql import SparkSession

# Create Spark session with Delta Lake support
spark = SparkSession.builder \
    .appName("Delta Optimization Example") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/sales_table/"

# Run basic OPTIMIZE command
spark.sql(f"OPTIMIZE delta.`{delta_table_path}`")

# Run OPTIMIZE with Z-ORDER for better data clustering
spark.sql(f"OPTIMIZE delta.`{delta_table_path}` ZORDER BY (date, region, product_id)")

# Run OPTIMIZE with custom file size target
spark.sql(f"""
OPTIMIZE delta.`{delta_table_path}`
WHERE date >= '2023-01-01'
""")

VACUUM Command

The VACUUM command removes files that are no longer needed by a Delta table:

# Set retention period (default is 7 days)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.conf.set("spark.databricks.delta.vacuum.logging.enabled", "true")

# List files that would be deleted (dry run)
spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 168 HOURS DRY RUN")

# Actually remove files older than retention period
spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 168 HOURS")

# Run VACUUM with shorter retention (use with caution)
spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 24 HOURS")

Advanced Optimization Strategies

Scheduled Optimization with Automated Workflows

# Import required libraries
from pyspark.sql import SparkSession
from datetime import datetime

# Create Spark session
spark = SparkSession.builder \
    .appName("Automated Delta Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define logging function
def log_optimization(delta_path, optimization_type, start_time):
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()

    log_data = [(delta_path, optimization_type, start_time.isoformat(), 
                end_time.isoformat(), duration)]

    schema = ["table_path", "operation", "start_time", "end_time", "duration_seconds"]
    log_df = spark.createDataFrame(log_data, schema)

    # Write to optimization log table
    log_df.write \
        .format("delta") \
        .mode("append") \
        .save("abfss://container@storage.dfs.core.windows.net/logs/optimization_history/")

# Get list of tables to optimize
tables_to_optimize = [
    "abfss://container@storage.dfs.core.windows.net/delta/sales_table/",
    "abfss://container@storage.dfs.core.windows.net/delta/customer_table/",
    "abfss://container@storage.dfs.core.windows.net/delta/product_table/"
]

# Perform optimization for each table
for table_path in tables_to_optimize:
    # Run OPTIMIZE
    start_time = datetime.now()
    spark.sql(f"OPTIMIZE delta.`{table_path}`")
    log_optimization(table_path, "OPTIMIZE", start_time)

    # Run VACUUM (if table is old enough)
    start_time = datetime.now()
    spark.sql(f"VACUUM delta.`{table_path}` RETAIN 168 HOURS")
    log_optimization(table_path, "VACUUM", start_time)

Partition-Aware Optimization

Optimize specific partitions for large tables:

# Import required libraries
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
import pyspark.sql.functions as F

# Create Spark session
spark = SparkSession.builder \
    .appName("Partition-Aware Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/large_partitioned_table/"

# Load Delta table
delta_table = DeltaTable.forPath(spark, delta_table_path)

# Get list of partitions
partitions_df = spark.sql(f"SHOW PARTITIONS delta.`{delta_table_path}`")
partitions = [row.partition for row in partitions_df.collect()]

# Get file counts for each partition
file_stats = []
for partition in partitions:
    # Extract partition values (assuming year/month partitioning)
    # Example partition format: "year=2023/month=01"
    year = partition.split('/')[0].split('=')[1]
    month = partition.split('/')[1].split('=')[1]

    # Count files in the partition
    files_df = spark.sql(f"""
        SELECT COUNT(*) as file_count
        FROM delta.`{delta_table_path}`
        WHERE year = {year} AND month = {month}
    """)

    file_count = files_df.first()[0]
    file_stats.append((partition, file_count, year, month))

# Sort partitions by file count (optimize those with most files first)
file_stats.sort(key=lambda x: x[1], reverse=True)

# Optimize partitions with more than 100 files
for partition, file_count, year, month in file_stats:
    if file_count > 100:
        print(f"Optimizing partition {partition} with {file_count} files")
        spark.sql(f"""
            OPTIMIZE delta.`{delta_table_path}`
            WHERE year = {year} AND month = {month}
            ZORDER BY (customer_id, product_id)
        """)

Data Skipping with Z-ORDER

Optimize table for specific query patterns using Z-ORDER:

# Import required libraries
from pyspark.sql import SparkSession
import time

# Create Spark session
spark = SparkSession.builder \
    .appName("Z-ORDER Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/query_table/"

# Define test query
test_query = f"""
    SELECT COUNT(*) 
    FROM delta.`{delta_table_path}`
    WHERE region = 'Europe' AND transaction_date BETWEEN '2023-01-01' AND '2023-01-31'
"""

# Run query before optimization and measure time
start_time = time.time()
spark.sql(test_query).show()
before_time = time.time() - start_time
print(f"Query time before optimization: {before_time:.2f} seconds")

# Run OPTIMIZE with Z-ORDER on query columns
spark.sql(f"""
    OPTIMIZE delta.`{delta_table_path}`
    ZORDER BY (region, transaction_date)
""")

# Run the same query after optimization
start_time = time.time()
spark.sql(test_query).show()
after_time = time.time() - start_time
print(f"Query time after optimization: {after_time:.2f} seconds")
print(f"Performance improvement: {(before_time - after_time) / before_time * 100:.2f}%")

Delta Cache Optimization

Leverage Delta Lake caching for frequently accessed data:

# Import required libraries
from pyspark.sql import SparkSession
import time

# Create Spark session with Delta Lake and cache support
spark = SparkSession.builder \
    .appName("Delta Cache Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.io.cache.enabled", "true") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/frequently_accessed_table/"

# Query without cache priming
start_time = time.time()
result = spark.sql(f"""
    SELECT region, product_category, SUM(sales_amount) AS total_sales
    FROM delta.`{delta_table_path}`
    GROUP BY region, product_category
    ORDER BY total_sales DESC
    LIMIT 10
""").collect()
first_query_time = time.time() - start_time

# The second execution should use cache
start_time = time.time()
result = spark.sql(f"""
    SELECT region, product_category, SUM(sales_amount) AS total_sales
    FROM delta.`{delta_table_path}`
    GROUP BY region, product_category
    ORDER BY total_sales DESC
    LIMIT 10
""").collect()
second_query_time = time.time() - start_time

print(f"First query time: {first_query_time:.2f} seconds")
print(f"Second query time (cached): {second_query_time:.2f} seconds")
print(f"Cache speedup: {first_query_time / second_query_time:.2f}x")

# Cache specific columns for better memory utilization
spark.sql(f"""
    CACHE SELECT region, product_category, sales_amount
    FROM delta.`{delta_table_path}`
    WHERE transaction_date >= '2023-01-01'
""")

Monitoring and Maintaining Delta Tables

Table History and Statistics

# Import required libraries
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Create Spark session
spark = SparkSession.builder \
    .appName("Delta Table Monitoring") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/monitored_table/"

# Check table history
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
history_df.show(10, truncate=False)

# Get table details and statistics
details_df = spark.sql(f"DESCRIBE DETAIL delta.`{delta_table_path}`")
details_df.show(truncate=False)

# Get file sizes and distribution
files_df = spark.sql(f"""
    DESCRIBE DETAIL delta.`{delta_table_path}`
""").select("location").first()

table_location = files_df["location"]

# List all files in the Delta table directory
files = spark.sparkContext.wholeTextFiles(f"{table_location}/[^_]*").keys().collect()

# Convert to DataFrame for analysis
files_info = [(f.split("/")[-1], f) for f in files if f.endswith(".parquet")]
file_df = spark.createDataFrame(files_info, ["filename", "path"])

# Show file stats
print(f"Total number of files: {file_df.count()}")

# Analyze file size distribution
spark.sql(f"""
    CREATE OR REPLACE TEMPORARY VIEW delta_files AS
    SELECT 
        input_file_name() AS file_path,
        COUNT(*) AS record_count
    FROM delta.`{delta_table_path}`
    GROUP BY input_file_name()
""")

spark.sql("""
    SELECT 
        percentile_approx(record_count, 0.5) AS median_records_per_file,
        MIN(record_count) AS min_records,
        MAX(record_count) AS max_records,
        AVG(record_count) AS avg_records
    FROM delta_files
""").show()

Auto-Optimize Configuration

# Import required libraries
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

# Create Spark session with auto-optimize enabled
spark = SparkSession.builder \
    .appName("Delta Auto Optimization") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.optimizeWrite.enabled", "true") \
    .config("spark.databricks.delta.autoCompact.enabled", "true") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/auto_optimized_table/"

# Create a new table with auto-optimize properties
spark.sql(f"""
CREATE TABLE IF NOT EXISTS auto_optimized_table
USING DELTA
LOCATION '{delta_table_path}'
TBLPROPERTIES (
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true
)
""")

# For existing tables, you can set these properties:
spark.sql(f"""
ALTER TABLE delta.`{delta_table_path}`
SET TBLPROPERTIES (
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true
)
""")

Performance Tuning Best Practices

1. File Size Optimization

Aim for file sizes between 128MB to 1GB:

# Import required libraries
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("File Size Optimization") \
    .config("spark.sql.files.maxPartitionBytes", "134217728") # 128 MB
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()

# Configure write operation for optimal file sizes
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/input_table/")

# Write with target file size of ~128MB
df.repartition(200) \
    .write \
    .option("maxRecordsPerFile", 500000) \
    .format("delta") \
    .save("abfss://container@storage.dfs.core.windows.net/delta/optimized_table/")

2. Partitioning Strategy

Create effective partitioning based on query patterns:

# Import required libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create Spark session
spark = SparkSession.builder \
    .appName("Delta Partitioning Strategy") \
    .getOrCreate()

# Load data
df = spark.read.format("delta").load("abfss://container@storage.dfs.core.windows.net/delta/source_table/")

# Add partitioning columns
df = df.withColumn("year", F.year("transaction_date")) \
       .withColumn("month", F.month("transaction_date"))

# Write with partitioning
df.write \
    .format("delta") \
    .partitionBy("year", "month") \
    .save("abfss://container@storage.dfs.core.windows.net/delta/well_partitioned_table/")

# For time-series data with high cardinality, consider limiting partitions
df.write \
    .format("delta") \
    .partitionBy("year") \
    .save("abfss://container@storage.dfs.core.windows.net/delta/balanced_partitioned_table/")

3. Compact Metadata with Delta Protocol Upgrades

# Import required libraries
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Delta Protocol Upgrade") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define Delta table path
delta_table_path = "abfss://container@storage.dfs.core.windows.net/delta/large_table/"

# Check current protocol version
spark.sql(f"""
    DESCRIBE DETAIL delta.`{delta_table_path}`
""").select("protocol.*").show()

# Upgrade to latest protocol version for metadata improvements
spark.sql(f"""
    ALTER TABLE delta.`{delta_table_path}` 
    SET TBLPROPERTIES (delta.minReaderVersion = 2, delta.minWriterVersion = 5)
""")

# Verify upgrade
spark.sql(f"""
    DESCRIBE DETAIL delta.`{delta_table_path}`
""").select("protocol.*").show()

Common Issues and Solutions

Issue: Slow query performance despite optimization

Solution:

  • Check data skew in partitions
  • Verify Z-ORDER columns match query predicates
  • Consider adjusting file sizes for your specific workload

Issue: VACUUM removing files that are still needed

Solution:

  • Use longer retention periods (7 days minimum recommended)
  • Always run with DRY RUN first
  • Ensure no long-running queries or operations are using old versions

Issue: Out of memory errors during OPTIMIZE

Solution:

  • Optimize smaller partitions individually
  • Increase executor memory
  • Use bin-packing optimization instead of Z-ORDER for very large tables