Skip to content

PySpark Fundamentals Lab

Home | Code Labs | PySpark Fundamentals

Status Level Duration

Hands-on lab for learning PySpark fundamentals on Azure Synapse Analytics.


Lab Overview

Learning Objectives

By the end of this lab, you will be able to:

  • Create and configure a SparkSession
  • Load data from various sources
  • Perform DataFrame transformations
  • Write optimized Spark queries
  • Save results to different formats

Prerequisites

  • Azure Synapse workspace access
  • Basic Python knowledge
  • Familiarity with SQL concepts

Lab 1: Getting Started

Exercise 1.1: Create SparkSession

# In Synapse notebooks, SparkSession is pre-configured
# For standalone environments:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PySpark Fundamentals") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Verify configuration
print(f"Spark version: {spark.version}")
print(f"Application ID: {spark.sparkContext.applicationId}")

Exercise 1.2: Load Sample Data

# Create sample DataFrame
data = [
    (1, "Alice", "Engineering", 75000),
    (2, "Bob", "Sales", 65000),
    (3, "Charlie", "Engineering", 80000),
    (4, "Diana", "HR", 55000),
    (5, "Eve", "Sales", 70000)
]

columns = ["id", "name", "department", "salary"]
df = spark.createDataFrame(data, columns)

# Display DataFrame
df.show()
df.printSchema()

Lab 2: DataFrame Operations

Exercise 2.1: Basic Transformations

from pyspark.sql.functions import col, upper, when, lit

# Select specific columns
df.select("name", "department").show()

# Add computed column
df_with_bonus = df.withColumn(
    "bonus",
    when(col("salary") > 70000, col("salary") * 0.15)
    .otherwise(col("salary") * 0.10)
)
df_with_bonus.show()

# Filter rows
engineers = df.filter(col("department") == "Engineering")
engineers.show()

# Rename column
df_renamed = df.withColumnRenamed("salary", "annual_salary")
df_renamed.show()

Exercise 2.2: Aggregations

from pyspark.sql.functions import count, avg, sum, max, min

# Group by department
dept_stats = df.groupBy("department").agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary"),
    min("salary").alias("min_salary"),
    sum("salary").alias("total_salary")
)
dept_stats.show()

# Multiple aggregations with ordering
df.groupBy("department") \
    .agg(avg("salary").alias("avg_salary")) \
    .orderBy(col("avg_salary").desc()) \
    .show()

Lab 3: Working with Files

Exercise 3.1: Read from Data Lake

# Read Parquet
parquet_df = spark.read.parquet(
    "abfss://data@storage.dfs.core.windows.net/samples/employees.parquet"
)

# Read CSV with options
csv_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("abfss://data@storage.dfs.core.windows.net/samples/employees.csv")

# Read JSON
json_df = spark.read.json(
    "abfss://data@storage.dfs.core.windows.net/samples/events.json"
)

# Read Delta Lake
delta_df = spark.read.format("delta").load(
    "abfss://data@storage.dfs.core.windows.net/delta/customers"
)

Exercise 3.2: Write Data

# Write as Parquet (partitioned)
df.write \
    .mode("overwrite") \
    .partitionBy("department") \
    .parquet("abfss://data@storage.dfs.core.windows.net/output/employees_parquet")

# Write as Delta Lake
df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("abfss://data@storage.dfs.core.windows.net/output/employees_delta")

# Write to SQL table
df.write \
    .mode("overwrite") \
    .saveAsTable("employees")

Lab 4: Joins and Unions

Exercise 4.1: Join Operations

# Create second DataFrame
departments_data = [
    ("Engineering", "Building A", "John"),
    ("Sales", "Building B", "Jane"),
    ("HR", "Building A", "Mike"),
    ("Marketing", "Building C", "Sarah")
]

departments = spark.createDataFrame(
    departments_data,
    ["dept_name", "location", "manager"]
)

# Inner join
joined_df = df.join(
    departments,
    df.department == departments.dept_name,
    "inner"
)
joined_df.show()

# Left join
left_joined = df.join(
    departments,
    df.department == departments.dept_name,
    "left"
)
left_joined.show()

Exercise 4.2: Union and Distinct

# Create additional employees
new_employees = spark.createDataFrame([
    (6, "Frank", "Marketing", 72000),
    (7, "Grace", "Engineering", 85000)
], columns)

# Union DataFrames
all_employees = df.union(new_employees)
all_employees.show()

# Remove duplicates
distinct_depts = all_employees.select("department").distinct()
distinct_depts.show()

Lab 5: Window Functions

Exercise 5.1: Ranking

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

# Define window
dept_window = Window.partitionBy("department").orderBy(col("salary").desc())

# Add ranking columns
ranked_df = df.withColumn("row_num", row_number().over(dept_window)) \
    .withColumn("rank", rank().over(dept_window)) \
    .withColumn("dense_rank", dense_rank().over(dept_window))

ranked_df.show()

Exercise 5.2: Running Totals

from pyspark.sql.functions import sum as spark_sum, avg as spark_avg

# Cumulative sum window
cumsum_window = Window.partitionBy("department") \
    .orderBy("id") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Running total and average
df_with_cumsum = df.withColumn(
    "cumulative_salary",
    spark_sum("salary").over(cumsum_window)
).withColumn(
    "running_avg",
    spark_avg("salary").over(cumsum_window)
)

df_with_cumsum.show()

Lab 6: SQL Interface

Exercise 6.1: Spark SQL

# Register temp view
df.createOrReplaceTempView("employees")

# Run SQL query
result = spark.sql("""
    SELECT
        department,
        COUNT(*) as emp_count,
        AVG(salary) as avg_salary,
        SUM(salary) as total_salary
    FROM employees
    GROUP BY department
    HAVING COUNT(*) > 0
    ORDER BY avg_salary DESC
""")

result.show()

Exercise 6.2: Complex SQL

# CTE and subquery
spark.sql("""
    WITH dept_stats AS (
        SELECT
            department,
            AVG(salary) as dept_avg
        FROM employees
        GROUP BY department
    )
    SELECT
        e.name,
        e.department,
        e.salary,
        d.dept_avg,
        e.salary - d.dept_avg as diff_from_avg
    FROM employees e
    JOIN dept_stats d ON e.department = d.department
    ORDER BY diff_from_avg DESC
""").show()

Challenge Exercise

Build an Analytics Pipeline

# Challenge: Create an employee analytics pipeline

# Step 1: Load data
employees_df = spark.read.parquet("/path/to/employees")
departments_df = spark.read.parquet("/path/to/departments")

# Step 2: Join and enrich
enriched = employees_df.join(
    departments_df,
    employees_df.dept_id == departments_df.id,
    "left"
)

# Step 3: Add analytics
from pyspark.sql.functions import current_date, datediff

analytics = enriched \
    .withColumn("tenure_days", datediff(current_date(), col("hire_date"))) \
    .withColumn("salary_percentile",
        percent_rank().over(Window.orderBy("salary")))

# Step 4: Aggregate
summary = analytics.groupBy("department").agg(
    count("*").alias("headcount"),
    avg("salary").alias("avg_salary"),
    avg("tenure_days").alias("avg_tenure")
)

# Step 5: Save results
summary.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/output/department_analytics")


Last Updated: January 2025