Home > Docs > Best Practices > Notebook Unit Testing
Unit Testing Microsoft Fabric Notebooks¶
Last Updated: 2026-04-27 | Version: 1.0.0
Table of Contents¶
- Overview
- Why Unit Test Fabric Notebooks
- Test Pyramid for Data Engineering
- Extracting Testable Functions
- pytest with Local Spark
- Mocking mssparkutils and notebookutils
- conftest.py Fixtures
- Writing Tests: Complete Examples
- nbval for Notebook Execution Testing
- Coverage Reporting
- CI Pipeline Integration
- Best Practices
- Anti-Patterns
- References
Overview¶
Microsoft Fabric notebooks execute on remote Spark clusters, which makes local testing challenging. This guide establishes patterns for extracting business logic from notebooks into testable modules, mocking Fabric-specific APIs, and integrating notebook tests into the CI pipeline.
This POC maintains 612 unit tests across casino, federal, streaming, and analytics domains. The patterns in this document are how those tests are structured.
Why Unit Test Fabric Notebooks¶
The Problem¶
Fabric notebooks are monolithic by default. Code, configuration, and orchestration logic are mixed in a single artifact. Without testing:
- Schema changes break downstream consumers silently.
- Compliance logic (CTR thresholds, SAR detection) cannot be validated without production data.
- Refactoring is risky because there is no safety net.
- Code review is opinion-based rather than evidence-based.
The Solution¶
Extract pure transformation functions from notebooks, test them locally with synthetic data, and validate notebook execution in CI. This catches 80% of bugs before code reaches a Fabric workspace.
What to Test¶
| Layer | What to Test | Example |
|---|---|---|
| Bronze | Schema enforcement, data type casting | Slot telemetry timestamp parsing |
| Silver | Cleansing rules, deduplication, validation | SSN hashing, null rejection |
| Gold | Aggregation logic, KPI calculations | Revenue per machine per hour |
| Compliance | Threshold detection, regulatory rules | CTR $10K threshold, SAR patterns |
Test Pyramid for Data Engineering¶
/\
/ \ End-to-End Tests
/ E2E \ Run in Fabric workspace
/--------\ against real/staging data
/ \
/ Integration \ Integration Tests
/ Tests \ Local Spark + Delta Lake
/----------------\ Multi-table joins, pipelines
/ \
/ Unit Tests \ Unit Tests
/ (pytest) \ Pure functions, schemas
/________________________\ Fast, no Spark needed
| Level | Scope | Speed | Where | Count in POC |
|---|---|---|---|---|
| Unit | Pure functions, schemas | < 1 sec each | Local (pytest) | 612 |
| Integration | Spark transformations | 5-30 sec each | Local Spark | 9 (GE suites) |
| E2E | Full pipeline | Minutes | Fabric workspace | Manual |
Extracting Testable Functions¶
The Pattern: Separate Logic from Orchestration¶
Before (monolithic notebook):
# Databricks notebook source
# COMMAND ----------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, hash, lit
spark = SparkSession.builder.getOrCreate()
# COMMAND ----------
# Read raw data
df = spark.read.parquet("Files/landing/slot_telemetry/")
# COMMAND ----------
# Complex business logic mixed with orchestration
df_clean = df.filter(col("machine_id").isNotNull())
df_clean = df_clean.withColumn(
"ctr_flag",
when(col("amount") >= 10000, lit(True)).otherwise(lit(False))
)
df_clean = df_clean.withColumn(
"sar_flag",
when(
(col("amount") >= 8000) & (col("amount") < 10000),
lit(True)
).otherwise(lit(False))
)
# COMMAND ----------
# Write to lakehouse
df_clean.write.format("delta").mode("append").saveAsTable("lh_bronze.bronze_slot_telemetry")
After (extracted functions):
Create a shared utility module at notebooks/utils/compliance_rules.py:
"""
notebooks/utils/compliance_rules.py
====================================
Reusable compliance detection functions for casino domain.
Extracted from notebooks for testability.
"""
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, lit, when
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CTR_THRESHOLD = 10_000 # Currency Transaction Report
SAR_LOWER = 8_000 # Suspicious Activity Report lower bound
SAR_UPPER = 10_000 # Suspicious Activity Report upper bound
W2G_SLOTS = 1_200 # W-2G threshold for slot machines
W2G_TABLE = 600 # W-2G threshold for table games
W2G_POKER = 5_000 # W-2G threshold for poker
def flag_ctr(df: DataFrame, amount_col: str = "amount") -> DataFrame:
"""Flag transactions meeting the CTR threshold ($10,000+)."""
return df.withColumn(
"ctr_flag",
when(col(amount_col) >= CTR_THRESHOLD, lit(True)).otherwise(lit(False)),
)
def flag_sar(df: DataFrame, amount_col: str = "amount") -> DataFrame:
"""Flag transactions in the SAR structuring range ($8K-$9,999)."""
return df.withColumn(
"sar_flag",
when(
(col(amount_col) >= SAR_LOWER) & (col(amount_col) < SAR_UPPER),
lit(True),
).otherwise(lit(False)),
)
def flag_w2g(
df: DataFrame,
amount_col: str = "amount",
game_type_col: str = "game_type",
) -> DataFrame:
"""Flag winnings requiring W-2G reporting based on game type."""
return df.withColumn(
"w2g_flag",
when(
(col(game_type_col) == "slots") & (col(amount_col) >= W2G_SLOTS),
lit(True),
)
.when(
(col(game_type_col) == "table") & (col(amount_col) >= W2G_TABLE),
lit(True),
)
.when(
(col(game_type_col) == "poker") & (col(amount_col) >= W2G_POKER),
lit(True),
)
.otherwise(lit(False)),
)
def reject_nulls(df: DataFrame, required_cols: list[str]) -> DataFrame:
"""Remove rows where any required column is null."""
condition = None
for c in required_cols:
clause = col(c).isNotNull()
condition = clause if condition is None else condition & clause
return df.filter(condition)
The notebook then becomes thin orchestration:
# Databricks notebook source
# COMMAND ----------
# MAGIC %run ./utils/compliance_rules
# COMMAND ----------
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SOURCE_PATH = "Files/output/bronze_slot_telemetry.parquet"
TARGET_TABLE = "lh_bronze.bronze_slot_telemetry"
# COMMAND ----------
df = spark.read.parquet(SOURCE_PATH)
df = reject_nulls(df, ["machine_id", "timestamp"])
df = flag_ctr(df)
df = flag_sar(df)
# COMMAND ----------
df.write.format("delta").mode("append").saveAsTable(TARGET_TABLE)
The %run Pattern¶
Fabric notebooks support %run to include shared code. Structure your utilities as:
notebooks/
utils/
compliance_rules.py # Testable functions
bronze_utils.py # Shared Bronze helpers
schema_definitions.py # StructType definitions
bronze/
01_bronze_slot_telemetry.py # Uses %run ./utils/compliance_rules
silver/
01_silver_slot_cleansed.py # Uses %run ./utils/compliance_rules
pytest with Local Spark¶
Installation¶
Note: Match the PySpark version to your Fabric workspace runtime. Fabric Runtime 1.3 uses Spark 3.5.x.
Spark Fixture (Session-Scoped)¶
Add to validation/conftest.py:
"""
validation/conftest.py
======================
Session-scoped fixtures shared across all validation test suites.
"""
import os
import pytest
from pyspark.sql import SparkSession
@pytest.fixture(scope="session", autouse=True)
def set_hash_salt() -> None:
"""Ensure FABRIC_POC_HASH_SALT is set for the entire test session."""
if not os.environ.get("FABRIC_POC_HASH_SALT"):
os.environ["FABRIC_POC_HASH_SALT"] = "test-salt-do-not-use-in-production"
@pytest.fixture(scope="session")
def spark() -> SparkSession:
"""Create a local Spark session for testing.
Session-scoped to avoid JVM startup overhead on every test.
Uses local[2] for parallelism without overwhelming CI runners.
"""
session = (
SparkSession.builder
.master("local[2]")
.appName("fabric-poc-tests")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.ui.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
)
yield session
session.stop()
Mocking mssparkutils and notebookutils¶
Fabric notebooks use mssparkutils and notebookutils for secrets, file operations, and widget parameters. These are not available locally.
Stub Module¶
Create validation/mocks/mssparkutils_stub.py:
"""
validation/mocks/mssparkutils_stub.py
======================================
Lightweight stub for mssparkutils used in local testing.
Provides the same interface as Fabric's mssparkutils without
requiring a Fabric workspace connection.
"""
class _Credentials:
"""Stub for mssparkutils.credentials."""
def getSecret(self, vault_url: str, secret_name: str) -> str:
"""Return a placeholder secret value for local testing."""
return f"mock-secret-{secret_name}"
def getToken(self, audience: str) -> str:
"""Return a placeholder token for local testing."""
return "mock-token-eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9"
class _Fs:
"""Stub for mssparkutils.fs."""
def ls(self, path: str) -> list[dict]:
"""List files at path (returns empty list locally)."""
return []
def mkdirs(self, path: str) -> bool:
"""Create directories (no-op locally)."""
return True
def cp(self, src: str, dest: str, recurse: bool = False) -> bool:
"""Copy files (no-op locally)."""
return True
def rm(self, path: str, recurse: bool = False) -> bool:
"""Remove files (no-op locally)."""
return True
def head(self, path: str, max_bytes: int = 65536) -> str:
"""Read head of file (returns empty string locally)."""
return ""
def put(self, path: str, content: str, overwrite: bool = False) -> bool:
"""Write content to file (no-op locally)."""
return True
class _Notebook:
"""Stub for mssparkutils.notebook."""
def run(self, notebook_name: str, timeout: int = 0, params: dict = None) -> str:
"""Run a notebook (no-op locally)."""
return f"mock-run-{notebook_name}"
def exit(self, value: str) -> None:
"""Exit notebook with value (no-op locally)."""
pass
class _Env:
"""Stub for mssparkutils.env."""
def getWorkspaceName(self) -> str:
return "local-test-workspace"
def getLakehouseName(self) -> str:
return "lh_bronze"
def getUserName(self) -> str:
return "test-user@contoso.com"
class MSSparkUtils:
"""Top-level stub matching the mssparkutils namespace."""
def __init__(self) -> None:
self.credentials = _Credentials()
self.fs = _Fs()
self.notebook = _Notebook()
self.env = _Env()
# Module-level instance (import as: from mssparkutils_stub import mssparkutils)
mssparkutils = MSSparkUtils()
Using the Stub in Tests¶
# In conftest.py or test setup
import sys
from unittest.mock import MagicMock
# Register the stub so notebook imports resolve
sys.modules["notebookutils"] = MagicMock()
sys.modules["notebookutils.mssparkutils"] = MagicMock()
Or use monkeypatch per test:
def test_notebook_with_mocked_utils(monkeypatch):
"""Test notebook logic with mocked mssparkutils."""
from validation.mocks.mssparkutils_stub import mssparkutils
monkeypatch.setattr("sys.modules", {
**sys.modules,
"mssparkutils": mssparkutils,
})
# Now import and test the notebook function
from notebooks.utils.compliance_rules import flag_ctr
# ... assertions
conftest.py Fixtures¶
Complete Fixture Set¶
"""
validation/unit_tests/conftest.py
==================================
Fixtures for unit testing notebook logic.
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from pyspark.sql import SparkSession
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
# ---------------------------------------------------------------------------
# Environment
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session", autouse=True)
def set_hash_salt() -> None:
if not os.environ.get("FABRIC_POC_HASH_SALT"):
os.environ["FABRIC_POC_HASH_SALT"] = "test-salt-do-not-use-in-production"
@pytest.fixture(scope="session", autouse=True)
def mock_fabric_modules() -> None:
"""Register mocks for Fabric-only modules so notebook imports succeed."""
sys.modules.setdefault("notebookutils", MagicMock())
sys.modules.setdefault("notebookutils.mssparkutils", MagicMock())
# ---------------------------------------------------------------------------
# Spark
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def spark() -> SparkSession:
session = (
SparkSession.builder
.master("local[2]")
.appName("notebook-unit-tests")
.config("spark.ui.enabled", "false")
.config("spark.driver.memory", "2g")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
)
yield session
session.stop()
# ---------------------------------------------------------------------------
# Sample DataFrames
# ---------------------------------------------------------------------------
SLOT_SCHEMA = StructType([
StructField("machine_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("game_type", StringType(), True),
StructField("timestamp", TimestampType(), False),
])
@pytest.fixture
def slot_telemetry_df(spark):
"""Small DataFrame mimicking Bronze slot telemetry."""
now = datetime.now()
data = [
("SLOT-001", "play", 25.0, "slots", now),
("SLOT-002", "jackpot", 12500.0, "slots", now), # CTR + W-2G
("SLOT-003", "play", 9500.0, "slots", now), # SAR range
("SLOT-004", "play", 8500.0, "table", now), # SAR range + W-2G
(None, "play", 100.0, "slots", now), # Null machine_id
("SLOT-006", "play", 500.0, "poker", now),
("SLOT-007", "play", 5500.0, "poker", now), # W-2G poker
]
return spark.createDataFrame(data, schema=SLOT_SCHEMA)
@pytest.fixture
def empty_df(spark):
"""Empty DataFrame with slot telemetry schema."""
return spark.createDataFrame([], schema=SLOT_SCHEMA)
# ---------------------------------------------------------------------------
# Temporary Delta paths
# ---------------------------------------------------------------------------
@pytest.fixture
def delta_tmp_path(tmp_path):
"""Return a temporary directory for Delta table writes."""
path = tmp_path / "delta_test"
path.mkdir()
return str(path)
Writing Tests: Complete Examples¶
Testing Compliance Rules¶
"""
validation/unit_tests/test_compliance_rules.py
===============================================
Tests for extracted compliance detection functions.
"""
import pytest
class TestFlagCTR:
"""Tests for Currency Transaction Report flagging."""
def test_amount_at_threshold_is_flagged(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import flag_ctr
result = flag_ctr(slot_telemetry_df)
flagged = result.filter("ctr_flag = true").collect()
# Only the $12,500 jackpot should be flagged
assert len(flagged) == 1
assert flagged[0]["machine_id"] == "SLOT-002"
def test_amount_below_threshold_not_flagged(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import flag_ctr
result = flag_ctr(slot_telemetry_df)
not_flagged = result.filter("ctr_flag = false").collect()
# All others (including nulls) should not be flagged
assert len(not_flagged) == 6
def test_empty_dataframe_returns_empty(self, spark, empty_df):
from notebooks.utils.compliance_rules import flag_ctr
result = flag_ctr(empty_df)
assert result.count() == 0
assert "ctr_flag" in result.columns
class TestFlagSAR:
"""Tests for Suspicious Activity Report pattern detection."""
def test_structuring_range_flagged(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import flag_sar
result = flag_sar(slot_telemetry_df)
flagged = result.filter("sar_flag = true").collect()
assert len(flagged) == 2
amounts = {row["amount"] for row in flagged}
assert amounts == {9500.0, 8500.0}
def test_at_threshold_not_flagged(self, spark):
from notebooks.utils.compliance_rules import flag_sar, SAR_UPPER
data = [("M1", "play", float(SAR_UPPER), "slots", None)]
# Amount at exactly $10,000 is CTR, not SAR
# (SAR is strictly below $10K)
class TestRejectNulls:
"""Tests for null rejection utility."""
def test_null_machine_id_rejected(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import reject_nulls
result = reject_nulls(slot_telemetry_df, ["machine_id"])
assert result.count() == 6 # Original 7 minus 1 null
machine_ids = [row["machine_id"] for row in result.collect()]
assert None not in machine_ids
def test_multiple_required_cols(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import reject_nulls
result = reject_nulls(slot_telemetry_df, ["machine_id", "amount"])
assert result.count() == 6
class TestFlagW2G:
"""Tests for W-2G winnings reporting thresholds."""
def test_slots_threshold_1200(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import flag_w2g
result = flag_w2g(slot_telemetry_df)
slots_w2g = result.filter(
"(game_type = 'slots') AND (w2g_flag = true)"
).collect()
# $12,500 and $9,500 are above $1,200 slot threshold
assert len(slots_w2g) == 2
def test_poker_threshold_5000(self, spark, slot_telemetry_df):
from notebooks.utils.compliance_rules import flag_w2g
result = flag_w2g(slot_telemetry_df)
poker_w2g = result.filter(
"(game_type = 'poker') AND (w2g_flag = true)"
).collect()
# Only the $5,500 poker win should be flagged
assert len(poker_w2g) == 1
assert poker_w2g[0]["amount"] == 5500.0
Testing Schema Enforcement¶
"""
validation/unit_tests/test_schema_enforcement.py
=================================================
Tests that schemas match expected definitions.
"""
from pyspark.sql.types import (
DoubleType,
StringType,
StructField,
StructType,
TimestampType,
)
def test_bronze_slot_schema_has_required_fields(spark, slot_telemetry_df):
"""Bronze slot telemetry must have all required fields."""
required_fields = {"machine_id", "event_type", "amount", "timestamp"}
actual_fields = {f.name for f in slot_telemetry_df.schema.fields}
assert required_fields.issubset(actual_fields)
def test_bronze_slot_schema_types(spark, slot_telemetry_df):
"""Field types must match the expected schema."""
type_map = {f.name: type(f.dataType) for f in slot_telemetry_df.schema.fields}
assert type_map["machine_id"] == StringType
assert type_map["amount"] == DoubleType
assert type_map["timestamp"] == TimestampType
nbval for Notebook Execution Testing¶
nbval validates that Jupyter notebooks execute without errors. For this POC's .py notebook format, convert to .ipynb first:
Installation¶
Converting and Testing¶
# Convert .py notebook to .ipynb
jupytext --to ipynb notebooks/bronze/01_bronze_slot_telemetry.py -o temp/test_notebook.ipynb
# Run nbval against the converted notebook
pytest --nbval temp/test_notebook.ipynb --nbval-lax
# --nbval-lax: don't compare cell outputs, just verify no exceptions
Limitations¶
- nbval requires a running kernel (local PySpark).
- Cells that reference OneLake paths will fail locally.
- Use
# NBVAL_SKIPcomments to skip cells that need Fabric.
# NBVAL_SKIP
# This cell requires a Fabric workspace connection
df.write.format("delta").saveAsTable("lh_bronze.bronze_slot_telemetry")
Coverage Reporting¶
pytest-cov Configuration¶
# pyproject.toml or setup.cfg
[tool.pytest.ini_options]
addopts = "--cov=notebooks/utils --cov=data_generation --cov-report=html --cov-report=term-missing"
testpaths = ["validation/unit_tests"]
Running Coverage¶
# Generate coverage report
pytest validation/unit_tests/ -v --cov=notebooks/utils --cov-report=html
# Open the HTML report
open htmlcov/index.html
Coverage Targets¶
| Module | Target | Rationale |
|---|---|---|
notebooks/utils/ | 90%+ | Core business logic, compliance rules |
data_generation/generators/ | 80%+ | Data quality depends on generator correctness |
data_generation/config/ | 70%+ | Configuration parsing |
notebooks/bronze/ | 50%+ | Orchestration code, many Fabric-only paths |
CI Pipeline Integration¶
GitHub Actions Workflow¶
# .github/workflows/test-notebooks.yml
name: Notebook Unit Tests
on:
pull_request:
paths:
- 'notebooks/**'
- 'data_generation/**'
- 'validation/**'
push:
branches: [main]
jobs:
unit-tests:
name: Run Notebook Unit Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: pip
- name: Install dependencies
run: |
pip install --upgrade pip
pip install pyspark==3.5.3 delta-spark==3.3.0
pip install pytest pytest-cov great-expectations
pip install -r requirements.txt
- name: Set test environment variables
run: |
echo "FABRIC_POC_HASH_SALT=ci-test-salt" >> $GITHUB_ENV
echo "SPARK_LOCAL_IP=127.0.0.1" >> $GITHUB_ENV
- name: Run unit tests
run: |
pytest validation/unit_tests/ -v \
--cov=notebooks/utils \
--cov=data_generation \
--cov-report=xml \
--junitxml=test-results.xml
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v4
with:
files: coverage.xml
flags: notebook-tests
fail_ci_if_error: false
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results
path: test-results.xml
Branch Protection Rule¶
Configure GitHub branch protection on main:
- Required status checks:
Run Notebook Unit Tests - Require branches to be up to date: Yes
- Require PR reviews: 1+
This prevents merging code that breaks existing tests.
Best Practices¶
DO¶
-
Extract before testing. Move business logic out of notebook cells into importable modules under
notebooks/utils/. -
Use session-scoped Spark. Starting a SparkSession for each test is slow. Use
scope="session"on your Spark fixture. -
Keep test data small. 5-10 rows per test DataFrame is enough. Large synthetic datasets belong in integration tests.
-
Test edge cases explicitly. Empty DataFrames, null values, boundary amounts ($9,999.99 vs $10,000.00), and type mismatches.
-
Name tests descriptively.
test_ctr_flag_set_when_amount_equals_10000is better thantest_flag_1. -
Match Spark versions. Your local PySpark version should match the Fabric Runtime version to avoid API differences.
-
Run tests before upload. Always run
pytestlocally before syncing notebooks to Fabric.
DO NOT¶
-
Do not test Spark internals. You are testing your logic, not whether
DataFrame.filter()works. -
Do not require network access. Unit tests must pass offline. Mock all external calls.
-
Do not use production data. Synthetic data only. Never copy real data into test fixtures.
-
Do not test orchestration in unit tests. Notebook cell ordering,
%runresolution, andmssparkutils.notebook.run()are integration concerns. -
Do not skip writing tests for "simple" notebooks. Schema enforcement and null handling are where most bugs hide.
Anti-Patterns¶
Anti-Pattern: Testing the Entire Notebook¶
# BAD: Running the whole notebook as a test
def test_bronze_notebook():
exec(open("notebooks/bronze/01_bronze_slot_telemetry.py").read())
This is brittle, slow, and gives no useful failure information.
Anti-Pattern: Hardcoded Paths in Tests¶
# BAD: Tests that depend on local file paths
def test_read_data():
df = spark.read.parquet("C:/Users/frank/data/test.parquet")
Use tmp_path fixtures or in-memory DataFrames instead.
Anti-Pattern: Testing with Production-Scale Data¶
# BAD: Generating millions of rows for a unit test
def test_aggregation():
data = [generate_row() for _ in range(1_000_000)]
Unit tests should complete in under a second. Use 5-10 rows.