Skip to content

Spark Performance Tuning

Home | Best Practices | Synapse | Spark Performance

Status Category

Advanced Spark performance tuning for Azure Synapse Analytics.


Execution Optimization

Catalyst Optimizer

# Enable all optimizations
spark.conf.set("spark.sql.optimizer.excludedRules", "")

# Check applied optimizations
df.explain(True)  # Look for "== Optimized Logical Plan =="

Code Generation

# Enable whole-stage code generation
spark.conf.set("spark.sql.codegen.wholeStage", "true")
spark.conf.set("spark.sql.codegen.factoryMode", "CODEGEN_ONLY")

Join Optimization

Join Strategies

Strategy When to Use Configuration
Broadcast Hash Small table < 10MB Auto or hint
Sort Merge Large tables, equality Default
Shuffle Hash Medium tables Hint required

Broadcast Join

from pyspark.sql.functions import broadcast

# Explicit broadcast (recommended for control)
result = large_df.join(broadcast(small_df), "key")

# Configure threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")

Sort Merge Join Optimization

# Pre-sort data for repeated joins
sorted_df = df.sortWithinPartitions("key")
sorted_df.write.bucketBy(100, "key").saveAsTable("bucketed_table")

# Joins on bucketed tables skip shuffle
spark.table("bucketed_table").join(spark.table("other_bucketed"), "key")

Serialization

Kryo Serialization

# Enable Kryo (faster than Java serialization)
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "false")

# Register custom classes
spark.conf.set("spark.kryo.classesToRegister", "com.company.CustomClass")

Caching Strategy

When to Cache

# Cache when DataFrame is reused multiple times
df = spark.read.parquet("/data/large_table")
df.cache()  # Or persist(StorageLevel.MEMORY_AND_DISK)

# Multiple operations on cached data
df.groupBy("region").count().show()
df.groupBy("product").sum("revenue").show()
df.filter("year = 2024").count()

# Release when done
df.unpersist()

Storage Levels

Level Use Case
MEMORY_ONLY Fast, fits in memory
MEMORY_AND_DISK Large DataFrames
DISK_ONLY Very large, infrequent access
MEMORY_ONLY_SER Memory constrained

Partition Tuning

Partition Count

# Rule of thumb: 2-4x CPU cores
num_executors = 10
cores_per_executor = 4
target_partitions = num_executors * cores_per_executor * 3  # = 120

spark.conf.set("spark.sql.shuffle.partitions", str(target_partitions))

Coalesce vs Repartition

# Coalesce: Reduce partitions (no shuffle)
df.coalesce(10).write.parquet("/output")

# Repartition: Change partition count (shuffle)
df.repartition(100).write.parquet("/output")

# Repartition by column (shuffle, then partition)
df.repartition(100, "key").write.parquet("/output")

I/O Optimization

Read Optimization

# Column pruning
df = spark.read.parquet("/data").select("col1", "col2")

# Predicate pushdown
df = spark.read.parquet("/data").filter("date >= '2024-01-01'")

# Schema specification (faster than inference)
schema = StructType([
    StructField("id", LongType()),
    StructField("name", StringType())
])
df = spark.read.schema(schema).parquet("/data")

Write Optimization

# Optimal file size
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000000)

# Avoid small files
df.coalesce(50).write.parquet("/output")

# Partition by low-cardinality columns
df.write.partitionBy("year", "month").parquet("/output")

Monitoring & Profiling

Spark Metrics

# Enable metrics collection
spark.conf.set("spark.metrics.conf.*.sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")

# Access via Spark UI or programmatically
sc = spark.sparkContext
print(f"Active jobs: {len(sc.statusTracker().getActiveJobIds())}")

Query Profiling

-- In Spark SQL
SET spark.sql.codegen.comments = true;
EXPLAIN COST SELECT * FROM table WHERE col = 'value';

Performance Checklist

Metric Target Impact
Task time variance < 2x Reduce skew
Shuffle read Minimize Use broadcast
Spill 0 Increase memory
GC time < 10% Tune executor memory
Serialization < 5% Use Kryo


Last Updated: January 2025