Skip to content

🐍 PySpark Data Processing Fundamentals Lab

Lab Duration Level Interactive

Master distributed data processing with PySpark through hands-on exercises. Learn DataFrames, transformations, actions, and optimization techniques using real-world datasets and business scenarios.

🎯 Learning Objectives

By completing this lab, you will be able to:

  • Create and manipulate PySpark DataFrames from various data sources
  • Apply transformations to process and clean large datasets efficiently
  • Use built-in functions for aggregations, joins, and window operations
  • Optimize PySpark jobs for better performance and resource utilization
  • Debug common issues and interpret Spark UI for troubleshooting
  • Implement best practices for production-ready PySpark applications

⏱️ Time Estimate: 2-3 hours

  • Setup & Basics: 30 minutes
  • Data Processing Exercises: 90 minutes
  • Optimization & Best Practices: 45 minutes
  • Challenge Projects: 30 minutes

🧪 Lab Environment

# Already configured with Spark pools - just start coding!
# Access via: https://web.azuresynapse.net/

Option B: Local Development

# Install PySpark locally
pip install pyspark jupyter pandas numpy matplotlib seaborn

# Start Jupyter Lab with PySpark
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="lab"
pyspark

Option C: GitHub Codespaces

  1. Open PySpark Lab Repository
  2. Click "Code" → "Create codespace"
  3. Environment automatically configured!

🗃️ Lab Datasets

We'll work with realistic business datasets throughout this lab:

Primary Dataset: E-commerce Transactions

# Sample data structure
{
    "transaction_id": "TXN-12345",
    "customer_id": "CUST-67890", 
    "product_id": "PROD-11111",
    "category": "Electronics",
    "amount": 299.99,
    "quantity": 1,
    "timestamp": "2024-01-15T10:30:00Z",
    "location": "Seattle, WA",
    "payment_method": "Credit Card"
}

Supporting Datasets

  • Customer Profiles: Demographics, segments, lifetime value
  • Product Catalog: Details, pricing, inventory levels
  • Store Locations: Geographic data for analysis
  • Weather Data: External data for correlation analysis

📚 Lab Modules

🚀 Module 1: PySpark Fundamentals (30 minutes)

Exercise 1.1: Setting Up Your Spark Session

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns

# Create Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("PySpark Fundamentals Lab") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Set log level to reduce noise
spark.sparkContext.setLogLevel("WARN")

print(f"Spark Version: {spark.version}")
print(f"Available cores: {spark.sparkContext.defaultParallelism}")

🎯 Challenge: Configure Spark for your specific environment (local vs. cloud) and explain each configuration parameter.

Exercise 1.2: Creating Your First DataFrame

# Method 1: From Python data structures
sample_data = [
    ("TXN-001", "CUST-101", "PROD-201", "Electronics", 299.99, 1, "2024-01-15"),
    ("TXN-002", "CUST-102", "PROD-202", "Clothing", 49.95, 2, "2024-01-15"),
    ("TXN-003", "CUST-103", "PROD-203", "Books", 12.99, 3, "2024-01-16")
]

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("date", StringType(), True)
])

df = spark.createDataFrame(sample_data, schema)
df.show()
df.printSchema()

# Method 2: Reading from files (we'll load the full dataset)
# Note: Replace with actual file path in your environment
transactions_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/path/to/ecommerce_transactions.csv")

print(f"Dataset shape: {transactions_df.count()} rows, {len(transactions_df.columns)} columns")

🔍 Exploration Challenge:

# YOUR TURN: Explore the dataset
# 1. Display first 10 rows with formatting
# 2. Show schema with detailed information  
# 3. Get basic statistics
# 4. Check for null values

# Solution template:
transactions_df.____()  # Fill in the method
transactions_df.describe().____()
transactions_df.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions_df.columns]).show()
💡 Click to see solution
# Solution:
transactions_df.show(10, truncate=False)
transactions_df.printSchema()
transactions_df.describe().show()
transactions_df.select([count(when(col(c).isNull(), c)).alias(c) for c in transactions_df.columns]).show()

🔄 Module 2: Data Transformations (45 minutes)

Exercise 2.1: Basic Transformations

# Select specific columns and create derived fields
enhanced_df = transactions_df.select(
    col("transaction_id"),
    col("customer_id"),
    col("product_id"),
    col("category"),
    col("amount"),
    col("quantity"),
    # Create new columns
    (col("amount") * col("quantity")).alias("total_amount"),
    to_timestamp(col("timestamp")).alias("transaction_time"),
    # Conditional logic
    when(col("amount") > 100, "High Value")
    .when(col("amount") > 50, "Medium Value")
    .otherwise("Low Value").alias("value_tier")
)

enhanced_df.show(5)

🎯 Your Turn - Data Cleaning:

# Clean and standardize the data
cleaned_df = enhanced_df \
    .filter(col("amount") > 0) \
    .filter(col("quantity") > 0) \
    .withColumn("category", upper(trim(col("category")))) \
    .withColumn("amount_rounded", round(col("amount"), 2))

# Add your transformations:
# 1. Remove transactions older than 1 year
# 2. Standardize customer_id format (uppercase)
# 3. Create age buckets for transaction times (morning, afternoon, evening, night)
# 4. Flag weekend transactions

# YOUR CODE HERE:
💡 Click to see solution
from datetime import datetime, timedelta

cleaned_df = enhanced_df \
    .filter(col("amount") > 0) \
    .filter(col("quantity") > 0) \
    .filter(col("transaction_time") > (datetime.now() - timedelta(days=365))) \
    .withColumn("category", upper(trim(col("category")))) \
    .withColumn("customer_id", upper(col("customer_id"))) \
    .withColumn("hour", hour(col("transaction_time"))) \
    .withColumn("time_bucket", 
                when(col("hour") < 6, "Night")
                .when(col("hour") < 12, "Morning") 
                .when(col("hour") < 18, "Afternoon")
                .otherwise("Evening")) \
    .withColumn("is_weekend", 
                when(dayofweek(col("transaction_time")).isin([1, 7]), True)
                .otherwise(False))

cleaned_df.show(5)

Exercise 2.2: Aggregations and Group Operations

# Basic aggregations
summary_stats = cleaned_df.agg(
    count("*").alias("total_transactions"),
    sum("total_amount").alias("total_revenue"), 
    avg("amount").alias("avg_transaction_amount"),
    max("amount").alias("max_transaction"),
    countDistinct("customer_id").alias("unique_customers"),
    countDistinct("product_id").alias("unique_products")
)

summary_stats.show()

# Group by operations
category_analysis = cleaned_df.groupBy("category") \
    .agg(
        count("*").alias("transaction_count"),
        sum("total_amount").alias("category_revenue"),
        avg("amount").alias("avg_amount"),
        countDistinct("customer_id").alias("unique_customers")
    ) \
    .orderBy(desc("category_revenue"))

category_analysis.show()

🎯 Advanced Aggregation Challenge:

# Create a comprehensive customer analysis
customer_metrics = cleaned_df.groupBy("customer_id") \
    .agg(
        # YOUR CODE: Calculate these metrics per customer
        # 1. Total transactions
        # 2. Total spent
        # 3. Average order value  
        # 4. Favorite category (most frequent)
        # 5. Days since last purchase
        # 6. Purchase frequency (transactions per day)
    )

# Bonus: Create customer segments based on RFM analysis (Recency, Frequency, Monetary)
💡 Click to see solution
from pyspark.sql.window import Window

customer_metrics = cleaned_df.groupBy("customer_id") \
    .agg(
        count("*").alias("total_transactions"),
        sum("total_amount").alias("total_spent"),
        avg("amount").alias("avg_order_value"),
        first("category").alias("most_frequent_category"),  # Simplified
        max("transaction_time").alias("last_purchase_date"),
        (count("*") / (datediff(max("transaction_time"), min("transaction_time")) + 1)).alias("purchase_frequency")
    ) \
    .withColumn("days_since_last_purchase", 
                datediff(current_date(), col("last_purchase_date")))

customer_metrics.show(10)

# RFM Segmentation
rfm_df = customer_metrics \
    .withColumn("recency_score", 
                when(col("days_since_last_purchase") <= 30, 5)
                .when(col("days_since_last_purchase") <= 60, 4)
                .when(col("days_since_last_purchase") <= 90, 3)
                .when(col("days_since_last_purchase") <= 180, 2)
                .otherwise(1)) \
    .withColumn("frequency_score",
                when(col("total_transactions") >= 20, 5)
                .when(col("total_transactions") >= 15, 4)
                .when(col("total_transactions") >= 10, 3)
                .when(col("total_transactions") >= 5, 2)
                .otherwise(1)) \
    .withColumn("monetary_score",
                when(col("total_spent") >= 1000, 5)
                .when(col("total_spent") >= 500, 4)
                .when(col("total_spent") >= 250, 3)
                .when(col("total_spent") >= 100, 2)
                .otherwise(1))

rfm_df.show(10)

🔗 Module 3: Joins and Window Functions (45 minutes)

Exercise 3.1: DataFrame Joins

# Create customer dimension data
customers_data = [
    ("CUST-101", "John Doe", 34, "Premium", "Seattle"),
    ("CUST-102", "Jane Smith", 28, "Regular", "Portland"),
    ("CUST-103", "Mike Johnson", 45, "VIP", "Vancouver")
]

customers_df = spark.createDataFrame(customers_data, 
    ["customer_id", "name", "age", "segment", "city"])

# Create product dimension data  
products_data = [
    ("PROD-201", "Laptop Pro", "Electronics", 1299.99),
    ("PROD-202", "Cotton T-Shirt", "Clothing", 24.99),
    ("PROD-203", "Python Programming Book", "Books", 39.99)
]

products_df = spark.createDataFrame(products_data,
    ["product_id", "product_name", "category", "list_price"])

# Join transactions with customer and product data
enriched_df = transactions_df \
    .join(customers_df, "customer_id", "inner") \
    .join(products_df, "product_id", "inner")

enriched_df.select("transaction_id", "name", "product_name", "amount", "segment").show(10)

🎯 Join Challenge - Sales Analysis:

# Create a comprehensive sales report with multiple joins
# Requirements:
# 1. Include all transaction, customer, and product details
# 2. Calculate profit margin (list_price - amount)
# 3. Add geographic analysis by customer city
# 4. Segment analysis by customer tier
# 5. Handle missing data gracefully

# YOUR CODE HERE:
sales_report_df = # Complete this implementation

Exercise 3.2: Window Functions

from pyspark.sql.window import Window

# Define window specifications
customer_window = Window.partitionBy("customer_id").orderBy("transaction_time")
monthly_window = Window.partitionBy(year("transaction_time"), month("transaction_time")).orderBy("transaction_time")

# Apply window functions
windowed_analysis = enriched_df \
    .withColumn("transaction_rank", row_number().over(customer_window)) \
    .withColumn("running_total", sum("amount").over(customer_window)) \
    .withColumn("customer_avg", avg("amount").over(Window.partitionBy("customer_id"))) \
    .withColumn("monthly_rank", rank().over(monthly_window)) \
    .withColumn("amount_vs_avg", col("amount") - col("customer_avg"))

windowed_analysis.select(
    "customer_id", "transaction_time", "amount", 
    "transaction_rank", "running_total", "customer_avg"
).show(10)

🎯 Advanced Window Functions Challenge:

# Advanced analytics using window functions
# Calculate:
# 1. Customer lifetime value (CLV) progression over time
# 2. Month-over-month growth rate for each customer
# 3. Percentile rankings within customer segments
# 4. Moving averages for trend analysis
# 5. Gap analysis (time between purchases)

# Template to get you started:
advanced_analytics = enriched_df \
    .withColumn("months_since_first", 
                months_between(col("transaction_time"), 
                              first("transaction_time").over(customer_window))) \
    .withColumn("prev_transaction_amount", 
                lag("amount").over(customer_window)) \
    # YOUR CODE: Add the remaining calculations

🚀 Module 4: Performance Optimization (30 minutes)

Exercise 4.1: Understanding Spark Execution

# Enable Spark UI and examine query plans
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Create a query that we'll optimize
inefficient_query = transactions_df \
    .filter(col("amount") > 50) \
    .join(customers_df, "customer_id", "inner") \
    .groupBy("segment", "category") \
    .agg(sum("amount").alias("total_revenue")) \
    .orderBy(desc("total_revenue"))

# Execute and examine the plan
print("=== Query Execution Plan ===")
inefficient_query.explain(True)

# Execute the query and time it
import time
start_time = time.time()
result = inefficient_query.collect()
execution_time = time.time() - start_time
print(f"Execution time: {execution_time:.2f} seconds")

Exercise 4.2: Optimization Techniques

# Optimization 1: Predicate Pushdown
optimized_query_1 = transactions_df \
    .filter(col("amount") > 50) \
    .select("customer_id", "category", "amount") \
    .join(customers_df.select("customer_id", "segment"), "customer_id", "inner") \
    .groupBy("segment", "category") \
    .agg(sum("amount").alias("total_revenue")) \
    .orderBy(desc("total_revenue"))

# Optimization 2: Partitioning
partitioned_df = transactions_df \
    .repartition(col("category")) \
    .cache()  # Cache if reused multiple times

# Optimization 3: Broadcast Join (for small tables)
from pyspark.sql.functions import broadcast

optimized_with_broadcast = transactions_df \
    .filter(col("amount") > 50) \
    .join(broadcast(customers_df), "customer_id", "inner") \
    .groupBy("segment", "category") \
    .agg(sum("amount").alias("total_revenue"))

print("=== Optimized Query Plan ===")
optimized_with_broadcast.explain(True)

🎯 Performance Tuning Challenge:

# YOUR TASK: Optimize this complex query
complex_query = transactions_df \
    .join(customers_df, "customer_id", "inner") \
    .join(products_df, "product_id", "inner") \
    .filter(col("transaction_time") > "2023-01-01") \
    .groupBy("segment", "category", "city") \
    .agg(
        count("*").alias("transaction_count"),
        sum("amount").alias("total_revenue"),
        avg("amount").alias("avg_amount"),
        countDistinct("customer_id").alias("unique_customers")
    ) \
    .filter(col("transaction_count") > 5) \
    .orderBy(desc("total_revenue"))

# Apply these optimizations:
# 1. Column pruning
# 2. Filter pushdown  
# 3. Appropriate join strategies
# 4. Proper caching
# 5. Partitioning

optimized_complex_query = # YOUR OPTIMIZED VERSION

📊 Module 5: Data Visualization Integration (20 minutes)

Exercise 5.1: Converting to Pandas for Visualization

import matplotlib.pyplot as plt
import seaborn as sns

# Convert Spark DataFrame to Pandas for visualization
# Note: Only do this for reasonably sized results
category_revenue_pandas = category_analysis.toPandas()

# Create visualizations
plt.figure(figsize=(12, 6))

# Revenue by category
plt.subplot(1, 2, 1)
sns.barplot(data=category_revenue_pandas, x='category', y='category_revenue')
plt.title('Revenue by Category')
plt.xticks(rotation=45)

# Transaction count by category
plt.subplot(1, 2, 2)
sns.barplot(data=category_revenue_pandas, x='category', y='transaction_count')  
plt.title('Transaction Count by Category')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# Time series analysis
daily_sales = enriched_df \
    .withColumn("date", to_date("transaction_time")) \
    .groupBy("date") \
    .agg(
        sum("amount").alias("daily_revenue"),
        count("*").alias("daily_transactions")
    ) \
    .orderBy("date") \
    .toPandas()

plt.figure(figsize=(12, 4))
plt.plot(daily_sales['date'], daily_sales['daily_revenue'])
plt.title('Daily Revenue Trend')
plt.xlabel('Date')
plt.ylabel('Revenue')
plt.xticks(rotation=45)
plt.show()

🎯 Module 6: Challenge Projects (30 minutes)

Challenge 1: Customer Cohort Analysis

"""
Build a cohort analysis to understand customer retention patterns:
1. Group customers by their first purchase month
2. Track their purchase behavior in subsequent months
3. Calculate retention rates for each cohort
4. Visualize the cohort table
"""

# YOUR IMPLEMENTATION HERE:
# Hint: Use window functions to find first purchase date
# Then calculate months since first purchase for each transaction

def build_cohort_analysis(transactions_df):
    # Step 1: Add cohort identifiers

    # Step 2: Calculate cohort metrics

    # Step 3: Create cohort table

    # Step 4: Calculate retention rates

    return cohort_table

cohort_result = build_cohort_analysis(enriched_df)

Challenge 2: Real-Time Streaming Simulation

"""
Simulate a real-time processing scenario:
1. Process transactions in micro-batches
2. Maintain running aggregations
3. Detect anomalies in real-time
4. Generate alerts for suspicious patterns
"""

def process_streaming_batch(batch_df, batch_id):
    """
    Process each batch of streaming data
    """
    # YOUR IMPLEMENTATION:
    # 1. Update running totals
    # 2. Check for anomalies
    # 3. Update dashboard metrics
    # 4. Generate alerts if needed
    pass

# Simulate streaming by processing data in chunks
batch_size = 1000
total_rows = transactions_df.count()

for i in range(0, total_rows, batch_size):
    batch_df = transactions_df.limit(batch_size).offset(i)
    process_streaming_batch(batch_df, i // batch_size)

Challenge 3: Machine Learning Feature Engineering

"""
Prepare features for a machine learning model to predict customer churn:
1. Create customer-level features
2. Calculate behavioral patterns  
3. Engineer time-based features
4. Create the final feature matrix
"""

def create_ml_features(transactions_df, customers_df):
    """
    Create comprehensive feature set for ML model
    """
    # Customer transaction patterns

    # Time-based features

    # Category preferences

    # Behavioral indicators

    return feature_matrix

ml_features = create_ml_features(enriched_df, customers_df)

✅ Lab Validation & Testing

Automated Test Suite

def run_lab_tests():
    """
    Validate your implementations meet the requirements
    """
    tests_passed = 0
    total_tests = 0

    # Test 1: DataFrame Creation
    try:
        assert transactions_df.count() > 0, "Transactions DataFrame should not be empty"
        assert len(transactions_df.columns) >= 7, "Should have at least 7 columns"
        tests_passed += 1
        print("✅ Test 1 Passed: DataFrame Creation")
    except Exception as e:
        print(f"❌ Test 1 Failed: {e}")
    finally:
        total_tests += 1

    # Test 2: Data Transformations
    try:
        assert "total_amount" in enhanced_df.columns, "Should have total_amount column"
        assert "value_tier" in enhanced_df.columns, "Should have value_tier column"
        tests_passed += 1
        print("✅ Test 2 Passed: Data Transformations")
    except Exception as e:
        print(f"❌ Test 2 Failed: {e}")
    finally:
        total_tests += 1

    # Test 3: Aggregations
    try:
        category_count = category_analysis.count()
        assert category_count > 0, "Category analysis should have results"
        tests_passed += 1
        print("✅ Test 3 Passed: Aggregations")
    except Exception as e:
        print(f"❌ Test 3 Failed: {e}")
    finally:
        total_tests += 1

    # Test 4: Joins
    try:
        enriched_count = enriched_df.count()
        assert enriched_count > 0, "Enriched DataFrame should have results"
        assert "name" in enriched_df.columns, "Should include customer name from join"
        tests_passed += 1
        print("✅ Test 4 Passed: Joins")
    except Exception as e:
        print(f"❌ Test 4 Failed: {e}")
    finally:
        total_tests += 1

    print(f"\n📊 Test Results: {tests_passed}/{total_tests} tests passed")

    if tests_passed == total_tests:
        print("🎉 Congratulations! All tests passed.")
        return True
    else:
        print("⚠️ Some tests failed. Please review your implementation.")
        return False

# Run the validation
run_lab_tests()

Performance Benchmark

def benchmark_performance():
    """
    Benchmark your optimizations against baseline performance
    """
    import time

    # Baseline query
    start_time = time.time()
    baseline_result = transactions_df.groupBy("category").count().collect()
    baseline_time = time.time() - start_time

    # Optimized query (replace with your optimization)
    start_time = time.time()
    optimized_result = partitioned_df.groupBy("category").count().collect()
    optimized_time = time.time() - start_time

    improvement = (baseline_time - optimized_time) / baseline_time * 100

    print(f"Baseline execution time: {baseline_time:.2f}s")
    print(f"Optimized execution time: {optimized_time:.2f}s")
    print(f"Performance improvement: {improvement:.1f}%")

    return improvement

performance_gain = benchmark_performance()

🎓 Knowledge Assessment

Quick Quiz

# Answer these questions in comments:

"""
1. What's the difference between a transformation and an action in Spark?
   Your answer: 

2. When would you use broadcast joins vs regular joins?
   Your answer:

3. What are the benefits of caching a DataFrame?
   Your answer:

4. How do window functions differ from group by operations?
   Your answer:

5. What factors should you consider when choosing the number of partitions?
   Your answer:
"""

Practical Assessment

Complete this real-world scenario:

"""
SCENARIO: You're a data engineer at an e-commerce company. 
The marketing team needs a daily report showing:

1. Top 10 products by revenue (last 30 days)
2. Customer segments with declining purchase patterns  
3. Geographic analysis of sales performance
4. Anomaly detection for unusual transaction patterns

Build a complete solution including:
- Data processing pipeline
- Performance optimization
- Error handling
- Output formatting for business users
"""

def build_marketing_report(transactions_df, customers_df, products_df):
    """
    Build comprehensive marketing analytics report
    """
    # YOUR IMPLEMENTATION HERE
    pass

# Execute your solution
marketing_report = build_marketing_report(transactions_df, customers_df, products_df)

🎉 Congratulations

You've completed the PySpark Fundamentals Lab! Here's what you've accomplished:

✅ Skills Gained

  • DataFrame Operations: Creation, transformation, and manipulation
  • Data Processing: Cleaning, filtering, and aggregation techniques
  • Advanced Analytics: Joins, window functions, and complex queries
  • Performance Optimization: Caching, partitioning, and query tuning
  • Production Practices: Testing, monitoring, and error handling

🔧 Technical Achievements

  • Processed large datasets efficiently using distributed computing
  • Implemented complex business logic using PySpark functions
  • Optimized queries for better performance and resource utilization
  • Created reusable, maintainable code following best practices

📊 Business Impact Understanding

  • Translated business requirements into technical solutions
  • Built analytics that drive real business decisions
  • Implemented patterns used in production data pipelines
  • Developed skills directly applicable to data engineering roles

🚀 Next Steps

Continue Your Learning Journey

  1. Advanced PySpark Topics:
  2. Delta Lake Integration Lab
  3. Streaming Analytics with Structured Streaming
  4. ML Pipeline Integration

  5. Production Skills:

  6. CI/CD for Analytics Pipelines
  7. Monitoring and Observability
  8. Security and Governance

  9. Certification Preparation:

  10. DP-203: Azure Data Engineer Associate
  11. DP-300: Azure Database Administrator Associate

Apply Your Skills

  • Personal Projects: Build analytics for your own datasets
  • Open Source Contributions: Contribute to PySpark ecosystem
  • Community Engagement: Share learnings and help others
  • Professional Growth: Apply concepts in your current role

📞 Support & Resources

Additional Learning Materials

Community & Support

  • Stack Overflow: Tag questions with pyspark and azure-synapse
  • GitHub Discussions: Lab Repository Discussions
  • Office Hours: Weekly Q&A sessions (Wednesdays 3 PM PT)

🎓 Lab Completed Successfully!

You're now ready to tackle real-world data processing challenges with PySpark. The skills you've gained form the foundation for advanced analytics, machine learning, and production data engineering workflows.


Lab Version: 1.0
Last Updated: January 2025
Completion Certificate: Available upon passing all assessments