🐍 PySpark Data Processing Fundamentals Lab¶
Master distributed data processing with PySpark through hands-on exercises. Learn DataFrames, transformations, actions, and optimization techniques using real-world datasets and business scenarios.
🎯 Learning Objectives¶
By completing this lab, you will be able to:
- ✅ Create and manipulate PySpark DataFrames from various data sources
- ✅ Apply transformations to process and clean large datasets efficiently
- ✅ Use built-in functions for aggregations, joins, and window operations
- ✅ Optimize PySpark jobs for better performance and resource utilization
- ✅ Debug common issues and interpret Spark UI for troubleshooting
- ✅ Implement best practices for production-ready PySpark applications
⏱️ Time Estimate: 2-3 hours¶
- Setup & Basics: 30 minutes
- Data Processing Exercises: 90 minutes
- Optimization & Best Practices: 45 minutes
- Challenge Projects: 30 minutes
🧪 Lab Environment¶
Option A: Azure Synapse Studio (Recommended)¶
# Already configured with Spark pools - just start coding!
# Access via: https://web.azuresynapse.net/
Option B: Local Development¶
# Install PySpark locally
pip install pyspark jupyter pandas numpy matplotlib seaborn
# Start Jupyter Lab with PySpark
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="lab"
pyspark
Option C: GitHub Codespaces¶
- Open PySpark Lab Repository
- Click "Code" → "Create codespace"
- Environment automatically configured!
🗃️ Lab Datasets¶
We'll work with realistic business datasets throughout this lab:
Primary Dataset: E-commerce Transactions¶
# Sample data structure
{
"transaction_id": "TXN-12345",
"customer_id": "CUST-67890",
"product_id": "PROD-11111",
"category": "Electronics",
"amount": 299.99,
"quantity": 1,
"timestamp": "2024-01-15T10:30:00Z",
"location": "Seattle, WA",
"payment_method": "Credit Card"
}
Supporting Datasets¶
- Customer Profiles: Demographics, segments, lifetime value
- Product Catalog: Details, pricing, inventory levels
- Store Locations: Geographic data for analysis
- Weather Data: External data for correlation analysis
📚 Lab Modules¶
🚀 Module 1: PySpark Fundamentals (30 minutes)¶
Exercise 1.1: Setting Up Your Spark Session¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns
# Create Spark session with optimized configuration
spark = SparkSession.builder \
.appName("PySpark Fundamentals Lab") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")
print(f"Spark Version: {spark.version}")
print(f"Available cores: {spark.sparkContext.defaultParallelism}")
🎯 Challenge: Configure Spark for your specific environment (local vs. cloud) and explain each configuration parameter.
Exercise 1.2: Creating Your First DataFrame¶
# Method 1: From Python data structures
sample_data = [
("TXN-001", "CUST-101", "PROD-201", "Electronics", 299.99, 1, "2024-01-15"),
("TXN-002", "CUST-102", "PROD-202", "Clothing", 49.95, 2, "2024-01-15"),
("TXN-003", "CUST-103", "PROD-203", "Books", 12.99, 3, "2024-01-16")
]
schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("category", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("quantity", IntegerType(), True),
StructField("date", StringType(), True)
])
df = spark.createDataFrame(sample_data, schema)
df.show()
df.printSchema()
# Method 2: Reading from files (we'll load the full dataset)
# Note: Replace with actual file path in your environment
transactions_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/path/to/ecommerce_transactions.csv")
print(f"Dataset shape: {transactions_df.count()} rows, {len(transactions_df.columns)} columns")
🔍 Exploration Challenge:
# YOUR TURN: Explore the dataset
# 1. Display first 10 rows with formatting
# 2. Show schema with detailed information
# 3. Get basic statistics
# 4. Check for null values
# Solution template:
transactions_df.____() # Fill in the method
transactions_df.describe().____()
transactions_df.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions_df.columns]).show()
💡 Click to see solution
🔄 Module 2: Data Transformations (45 minutes)¶
Exercise 2.1: Basic Transformations¶
# Select specific columns and create derived fields
enhanced_df = transactions_df.select(
col("transaction_id"),
col("customer_id"),
col("product_id"),
col("category"),
col("amount"),
col("quantity"),
# Create new columns
(col("amount") * col("quantity")).alias("total_amount"),
to_timestamp(col("timestamp")).alias("transaction_time"),
# Conditional logic
when(col("amount") > 100, "High Value")
.when(col("amount") > 50, "Medium Value")
.otherwise("Low Value").alias("value_tier")
)
enhanced_df.show(5)
🎯 Your Turn - Data Cleaning:
# Clean and standardize the data
cleaned_df = enhanced_df \
.filter(col("amount") > 0) \
.filter(col("quantity") > 0) \
.withColumn("category", upper(trim(col("category")))) \
.withColumn("amount_rounded", round(col("amount"), 2))
# Add your transformations:
# 1. Remove transactions older than 1 year
# 2. Standardize customer_id format (uppercase)
# 3. Create age buckets for transaction times (morning, afternoon, evening, night)
# 4. Flag weekend transactions
# YOUR CODE HERE:
💡 Click to see solution
from datetime import datetime, timedelta
cleaned_df = enhanced_df \
.filter(col("amount") > 0) \
.filter(col("quantity") > 0) \
.filter(col("transaction_time") > (datetime.now() - timedelta(days=365))) \
.withColumn("category", upper(trim(col("category")))) \
.withColumn("customer_id", upper(col("customer_id"))) \
.withColumn("hour", hour(col("transaction_time"))) \
.withColumn("time_bucket",
when(col("hour") < 6, "Night")
.when(col("hour") < 12, "Morning")
.when(col("hour") < 18, "Afternoon")
.otherwise("Evening")) \
.withColumn("is_weekend",
when(dayofweek(col("transaction_time")).isin([1, 7]), True)
.otherwise(False))
cleaned_df.show(5)
Exercise 2.2: Aggregations and Group Operations¶
# Basic aggregations
summary_stats = cleaned_df.agg(
count("*").alias("total_transactions"),
sum("total_amount").alias("total_revenue"),
avg("amount").alias("avg_transaction_amount"),
max("amount").alias("max_transaction"),
countDistinct("customer_id").alias("unique_customers"),
countDistinct("product_id").alias("unique_products")
)
summary_stats.show()
# Group by operations
category_analysis = cleaned_df.groupBy("category") \
.agg(
count("*").alias("transaction_count"),
sum("total_amount").alias("category_revenue"),
avg("amount").alias("avg_amount"),
countDistinct("customer_id").alias("unique_customers")
) \
.orderBy(desc("category_revenue"))
category_analysis.show()
🎯 Advanced Aggregation Challenge:
# Create a comprehensive customer analysis
customer_metrics = cleaned_df.groupBy("customer_id") \
.agg(
# YOUR CODE: Calculate these metrics per customer
# 1. Total transactions
# 2. Total spent
# 3. Average order value
# 4. Favorite category (most frequent)
# 5. Days since last purchase
# 6. Purchase frequency (transactions per day)
)
# Bonus: Create customer segments based on RFM analysis (Recency, Frequency, Monetary)
💡 Click to see solution
from pyspark.sql.window import Window
customer_metrics = cleaned_df.groupBy("customer_id") \
.agg(
count("*").alias("total_transactions"),
sum("total_amount").alias("total_spent"),
avg("amount").alias("avg_order_value"),
first("category").alias("most_frequent_category"), # Simplified
max("transaction_time").alias("last_purchase_date"),
(count("*") / (datediff(max("transaction_time"), min("transaction_time")) + 1)).alias("purchase_frequency")
) \
.withColumn("days_since_last_purchase",
datediff(current_date(), col("last_purchase_date")))
customer_metrics.show(10)
# RFM Segmentation
rfm_df = customer_metrics \
.withColumn("recency_score",
when(col("days_since_last_purchase") <= 30, 5)
.when(col("days_since_last_purchase") <= 60, 4)
.when(col("days_since_last_purchase") <= 90, 3)
.when(col("days_since_last_purchase") <= 180, 2)
.otherwise(1)) \
.withColumn("frequency_score",
when(col("total_transactions") >= 20, 5)
.when(col("total_transactions") >= 15, 4)
.when(col("total_transactions") >= 10, 3)
.when(col("total_transactions") >= 5, 2)
.otherwise(1)) \
.withColumn("monetary_score",
when(col("total_spent") >= 1000, 5)
.when(col("total_spent") >= 500, 4)
.when(col("total_spent") >= 250, 3)
.when(col("total_spent") >= 100, 2)
.otherwise(1))
rfm_df.show(10)
🔗 Module 3: Joins and Window Functions (45 minutes)¶
Exercise 3.1: DataFrame Joins¶
# Create customer dimension data
customers_data = [
("CUST-101", "John Doe", 34, "Premium", "Seattle"),
("CUST-102", "Jane Smith", 28, "Regular", "Portland"),
("CUST-103", "Mike Johnson", 45, "VIP", "Vancouver")
]
customers_df = spark.createDataFrame(customers_data,
["customer_id", "name", "age", "segment", "city"])
# Create product dimension data
products_data = [
("PROD-201", "Laptop Pro", "Electronics", 1299.99),
("PROD-202", "Cotton T-Shirt", "Clothing", 24.99),
("PROD-203", "Python Programming Book", "Books", 39.99)
]
products_df = spark.createDataFrame(products_data,
["product_id", "product_name", "category", "list_price"])
# Join transactions with customer and product data
enriched_df = transactions_df \
.join(customers_df, "customer_id", "inner") \
.join(products_df, "product_id", "inner")
enriched_df.select("transaction_id", "name", "product_name", "amount", "segment").show(10)
🎯 Join Challenge - Sales Analysis:
# Create a comprehensive sales report with multiple joins
# Requirements:
# 1. Include all transaction, customer, and product details
# 2. Calculate profit margin (list_price - amount)
# 3. Add geographic analysis by customer city
# 4. Segment analysis by customer tier
# 5. Handle missing data gracefully
# YOUR CODE HERE:
sales_report_df = # Complete this implementation
Exercise 3.2: Window Functions¶
from pyspark.sql.window import Window
# Define window specifications
customer_window = Window.partitionBy("customer_id").orderBy("transaction_time")
monthly_window = Window.partitionBy(year("transaction_time"), month("transaction_time")).orderBy("transaction_time")
# Apply window functions
windowed_analysis = enriched_df \
.withColumn("transaction_rank", row_number().over(customer_window)) \
.withColumn("running_total", sum("amount").over(customer_window)) \
.withColumn("customer_avg", avg("amount").over(Window.partitionBy("customer_id"))) \
.withColumn("monthly_rank", rank().over(monthly_window)) \
.withColumn("amount_vs_avg", col("amount") - col("customer_avg"))
windowed_analysis.select(
"customer_id", "transaction_time", "amount",
"transaction_rank", "running_total", "customer_avg"
).show(10)
🎯 Advanced Window Functions Challenge:
# Advanced analytics using window functions
# Calculate:
# 1. Customer lifetime value (CLV) progression over time
# 2. Month-over-month growth rate for each customer
# 3. Percentile rankings within customer segments
# 4. Moving averages for trend analysis
# 5. Gap analysis (time between purchases)
# Template to get you started:
advanced_analytics = enriched_df \
.withColumn("months_since_first",
months_between(col("transaction_time"),
first("transaction_time").over(customer_window))) \
.withColumn("prev_transaction_amount",
lag("amount").over(customer_window)) \
# YOUR CODE: Add the remaining calculations
🚀 Module 4: Performance Optimization (30 minutes)¶
Exercise 4.1: Understanding Spark Execution¶
# Enable Spark UI and examine query plans
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Create a query that we'll optimize
inefficient_query = transactions_df \
.filter(col("amount") > 50) \
.join(customers_df, "customer_id", "inner") \
.groupBy("segment", "category") \
.agg(sum("amount").alias("total_revenue")) \
.orderBy(desc("total_revenue"))
# Execute and examine the plan
print("=== Query Execution Plan ===")
inefficient_query.explain(True)
# Execute the query and time it
import time
start_time = time.time()
result = inefficient_query.collect()
execution_time = time.time() - start_time
print(f"Execution time: {execution_time:.2f} seconds")
Exercise 4.2: Optimization Techniques¶
# Optimization 1: Predicate Pushdown
optimized_query_1 = transactions_df \
.filter(col("amount") > 50) \
.select("customer_id", "category", "amount") \
.join(customers_df.select("customer_id", "segment"), "customer_id", "inner") \
.groupBy("segment", "category") \
.agg(sum("amount").alias("total_revenue")) \
.orderBy(desc("total_revenue"))
# Optimization 2: Partitioning
partitioned_df = transactions_df \
.repartition(col("category")) \
.cache() # Cache if reused multiple times
# Optimization 3: Broadcast Join (for small tables)
from pyspark.sql.functions import broadcast
optimized_with_broadcast = transactions_df \
.filter(col("amount") > 50) \
.join(broadcast(customers_df), "customer_id", "inner") \
.groupBy("segment", "category") \
.agg(sum("amount").alias("total_revenue"))
print("=== Optimized Query Plan ===")
optimized_with_broadcast.explain(True)
🎯 Performance Tuning Challenge:
# YOUR TASK: Optimize this complex query
complex_query = transactions_df \
.join(customers_df, "customer_id", "inner") \
.join(products_df, "product_id", "inner") \
.filter(col("transaction_time") > "2023-01-01") \
.groupBy("segment", "category", "city") \
.agg(
count("*").alias("transaction_count"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_amount"),
countDistinct("customer_id").alias("unique_customers")
) \
.filter(col("transaction_count") > 5) \
.orderBy(desc("total_revenue"))
# Apply these optimizations:
# 1. Column pruning
# 2. Filter pushdown
# 3. Appropriate join strategies
# 4. Proper caching
# 5. Partitioning
optimized_complex_query = # YOUR OPTIMIZED VERSION
📊 Module 5: Data Visualization Integration (20 minutes)¶
Exercise 5.1: Converting to Pandas for Visualization¶
import matplotlib.pyplot as plt
import seaborn as sns
# Convert Spark DataFrame to Pandas for visualization
# Note: Only do this for reasonably sized results
category_revenue_pandas = category_analysis.toPandas()
# Create visualizations
plt.figure(figsize=(12, 6))
# Revenue by category
plt.subplot(1, 2, 1)
sns.barplot(data=category_revenue_pandas, x='category', y='category_revenue')
plt.title('Revenue by Category')
plt.xticks(rotation=45)
# Transaction count by category
plt.subplot(1, 2, 2)
sns.barplot(data=category_revenue_pandas, x='category', y='transaction_count')
plt.title('Transaction Count by Category')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# Time series analysis
daily_sales = enriched_df \
.withColumn("date", to_date("transaction_time")) \
.groupBy("date") \
.agg(
sum("amount").alias("daily_revenue"),
count("*").alias("daily_transactions")
) \
.orderBy("date") \
.toPandas()
plt.figure(figsize=(12, 4))
plt.plot(daily_sales['date'], daily_sales['daily_revenue'])
plt.title('Daily Revenue Trend')
plt.xlabel('Date')
plt.ylabel('Revenue')
plt.xticks(rotation=45)
plt.show()
🎯 Module 6: Challenge Projects (30 minutes)¶
Challenge 1: Customer Cohort Analysis¶
"""
Build a cohort analysis to understand customer retention patterns:
1. Group customers by their first purchase month
2. Track their purchase behavior in subsequent months
3. Calculate retention rates for each cohort
4. Visualize the cohort table
"""
# YOUR IMPLEMENTATION HERE:
# Hint: Use window functions to find first purchase date
# Then calculate months since first purchase for each transaction
def build_cohort_analysis(transactions_df):
# Step 1: Add cohort identifiers
# Step 2: Calculate cohort metrics
# Step 3: Create cohort table
# Step 4: Calculate retention rates
return cohort_table
cohort_result = build_cohort_analysis(enriched_df)
Challenge 2: Real-Time Streaming Simulation¶
"""
Simulate a real-time processing scenario:
1. Process transactions in micro-batches
2. Maintain running aggregations
3. Detect anomalies in real-time
4. Generate alerts for suspicious patterns
"""
def process_streaming_batch(batch_df, batch_id):
"""
Process each batch of streaming data
"""
# YOUR IMPLEMENTATION:
# 1. Update running totals
# 2. Check for anomalies
# 3. Update dashboard metrics
# 4. Generate alerts if needed
pass
# Simulate streaming by processing data in chunks
batch_size = 1000
total_rows = transactions_df.count()
for i in range(0, total_rows, batch_size):
batch_df = transactions_df.limit(batch_size).offset(i)
process_streaming_batch(batch_df, i // batch_size)
Challenge 3: Machine Learning Feature Engineering¶
"""
Prepare features for a machine learning model to predict customer churn:
1. Create customer-level features
2. Calculate behavioral patterns
3. Engineer time-based features
4. Create the final feature matrix
"""
def create_ml_features(transactions_df, customers_df):
"""
Create comprehensive feature set for ML model
"""
# Customer transaction patterns
# Time-based features
# Category preferences
# Behavioral indicators
return feature_matrix
ml_features = create_ml_features(enriched_df, customers_df)
✅ Lab Validation & Testing¶
Automated Test Suite¶
def run_lab_tests():
"""
Validate your implementations meet the requirements
"""
tests_passed = 0
total_tests = 0
# Test 1: DataFrame Creation
try:
assert transactions_df.count() > 0, "Transactions DataFrame should not be empty"
assert len(transactions_df.columns) >= 7, "Should have at least 7 columns"
tests_passed += 1
print("✅ Test 1 Passed: DataFrame Creation")
except Exception as e:
print(f"❌ Test 1 Failed: {e}")
finally:
total_tests += 1
# Test 2: Data Transformations
try:
assert "total_amount" in enhanced_df.columns, "Should have total_amount column"
assert "value_tier" in enhanced_df.columns, "Should have value_tier column"
tests_passed += 1
print("✅ Test 2 Passed: Data Transformations")
except Exception as e:
print(f"❌ Test 2 Failed: {e}")
finally:
total_tests += 1
# Test 3: Aggregations
try:
category_count = category_analysis.count()
assert category_count > 0, "Category analysis should have results"
tests_passed += 1
print("✅ Test 3 Passed: Aggregations")
except Exception as e:
print(f"❌ Test 3 Failed: {e}")
finally:
total_tests += 1
# Test 4: Joins
try:
enriched_count = enriched_df.count()
assert enriched_count > 0, "Enriched DataFrame should have results"
assert "name" in enriched_df.columns, "Should include customer name from join"
tests_passed += 1
print("✅ Test 4 Passed: Joins")
except Exception as e:
print(f"❌ Test 4 Failed: {e}")
finally:
total_tests += 1
print(f"\n📊 Test Results: {tests_passed}/{total_tests} tests passed")
if tests_passed == total_tests:
print("🎉 Congratulations! All tests passed.")
return True
else:
print("⚠️ Some tests failed. Please review your implementation.")
return False
# Run the validation
run_lab_tests()
Performance Benchmark¶
def benchmark_performance():
"""
Benchmark your optimizations against baseline performance
"""
import time
# Baseline query
start_time = time.time()
baseline_result = transactions_df.groupBy("category").count().collect()
baseline_time = time.time() - start_time
# Optimized query (replace with your optimization)
start_time = time.time()
optimized_result = partitioned_df.groupBy("category").count().collect()
optimized_time = time.time() - start_time
improvement = (baseline_time - optimized_time) / baseline_time * 100
print(f"Baseline execution time: {baseline_time:.2f}s")
print(f"Optimized execution time: {optimized_time:.2f}s")
print(f"Performance improvement: {improvement:.1f}%")
return improvement
performance_gain = benchmark_performance()
🎓 Knowledge Assessment¶
Quick Quiz¶
# Answer these questions in comments:
"""
1. What's the difference between a transformation and an action in Spark?
Your answer:
2. When would you use broadcast joins vs regular joins?
Your answer:
3. What are the benefits of caching a DataFrame?
Your answer:
4. How do window functions differ from group by operations?
Your answer:
5. What factors should you consider when choosing the number of partitions?
Your answer:
"""
Practical Assessment¶
Complete this real-world scenario:
"""
SCENARIO: You're a data engineer at an e-commerce company.
The marketing team needs a daily report showing:
1. Top 10 products by revenue (last 30 days)
2. Customer segments with declining purchase patterns
3. Geographic analysis of sales performance
4. Anomaly detection for unusual transaction patterns
Build a complete solution including:
- Data processing pipeline
- Performance optimization
- Error handling
- Output formatting for business users
"""
def build_marketing_report(transactions_df, customers_df, products_df):
"""
Build comprehensive marketing analytics report
"""
# YOUR IMPLEMENTATION HERE
pass
# Execute your solution
marketing_report = build_marketing_report(transactions_df, customers_df, products_df)
🎉 Congratulations¶
You've completed the PySpark Fundamentals Lab! Here's what you've accomplished:
✅ Skills Gained¶
- DataFrame Operations: Creation, transformation, and manipulation
- Data Processing: Cleaning, filtering, and aggregation techniques
- Advanced Analytics: Joins, window functions, and complex queries
- Performance Optimization: Caching, partitioning, and query tuning
- Production Practices: Testing, monitoring, and error handling
🔧 Technical Achievements¶
- Processed large datasets efficiently using distributed computing
- Implemented complex business logic using PySpark functions
- Optimized queries for better performance and resource utilization
- Created reusable, maintainable code following best practices
📊 Business Impact Understanding¶
- Translated business requirements into technical solutions
- Built analytics that drive real business decisions
- Implemented patterns used in production data pipelines
- Developed skills directly applicable to data engineering roles
🚀 Next Steps¶
Continue Your Learning Journey¶
- Advanced PySpark Topics:
- Delta Lake Integration Lab
- Streaming Analytics with Structured Streaming
-
Production Skills:
- CI/CD for Analytics Pipelines
- Monitoring and Observability
-
Certification Preparation:
- DP-203: Azure Data Engineer Associate
- DP-300: Azure Database Administrator Associate
Apply Your Skills¶
- Personal Projects: Build analytics for your own datasets
- Open Source Contributions: Contribute to PySpark ecosystem
- Community Engagement: Share learnings and help others
- Professional Growth: Apply concepts in your current role
📞 Support & Resources¶
Additional Learning Materials¶
- Official PySpark Documentation: spark.apache.org/docs/latest/api/python/
- Spark SQL Guide: spark.apache.org/docs/latest/sql-programming-guide.html
- Azure Synapse Spark: docs.microsoft.com/azure/synapse-analytics/spark/
Community & Support¶
- Stack Overflow: Tag questions with
pysparkandazure-synapse - GitHub Discussions: Lab Repository Discussions
- Office Hours: Weekly Q&A sessions (Wednesdays 3 PM PT)
🎓 Lab Completed Successfully!
You're now ready to tackle real-world data processing challenges with PySpark. The skills you've gained form the foundation for advanced analytics, machine learning, and production data engineering workflows.
Lab Version: 1.0
Last Updated: January 2025
Completion Certificate: Available upon passing all assessments