Skip to content

🌿 Tutorial 35: EPA Environmental Analytics

Difficulty Duration Prerequisites

🏠 Home > 📖 Tutorials > 🌿 EPA Environmental Analytics


Third-party references — publicly sourced, good-faith comparison

This page references non-Microsoft products and services. That information is drawn from each vendor's publicly available documentation and is offered for honest, good-faith comparison only. This is a personal project written from a Microsoft Fabric and Azure perspective; it does not claim expertise in, or authority over, any third-party product, and nothing here is an official statement by, or endorsed by, those vendors. Capabilities, pricing, and features change often — always verify against the vendor's current official documentation. Where a third-party offering is the stronger choice, we say so plainly.


🌿 Tutorial 35: EPA Environmental Analytics

Difficulty ⭐⭐⭐⭐ Advanced
Time ⏱️ 120-150 minutes
Focus EPA Air Quality Monitoring, Toxic Release Inventory, Water Compliance, Environmental Justice Analysis & Real-Time AQI Tracking

📊 Progress Tracker

┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│  00  │  01  │  02  │  03  │  04  │  05  │  06  │  07  │  08  │  09  │  10  │  11  │  12  │  13  │
│SETUP │BRNZE │SILVR │ GOLD │  RT  │ PBI  │PIPES │ GOV  │MIRRR │AI/ML │TDATA │ SAS  │CICD  │MIGR  │
├──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┤
│  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │
└──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘

┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐
│  14  │  15  │  16  │  17  │  18  │  19  │  20  │  21  │  22  │  23  │  24  │  25  │  26  │  27  │  28  │  29  │  30  │  31  │
│ SEC  │ COST │PERF  │ MON  │SHARE │COPLT │WKBST │ GEO  │ NET  │SHIR  │ SNW  │ DB2  │MULTI │VIDEO │ MOVE │GEOLC │TRIBL │ DOT  │
├──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┼──────┤
│  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │  ✅  │
└──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘

┌──────┬──────┬──────┬──────┬──────┐
│  32  │  33  │  34  │  35  │  36  │
│ USDA │ SBA  │ NOAA │ EPA  │ DOI  │
├──────┼──────┼──────┼──────┼──────┤
│  ✅  │  ✅  │  ✅  │  🔵  │  ○   │
└──────┴──────┴──────┴──────┴──────┘
                YOU ARE HERE
Navigation
⬅️ Previous 34-NOAA Weather & Climate Analytics
➡️ Next 36-DOI Natural Resources Analytics

📖 Overview

The U.S. Environmental Protection Agency (EPA) is charged with protecting human health and the environment through regulation, enforcement, and scientific research. The EPA collects and publishes some of the most consequential environmental data in the world: air quality measurements from over 4,000 monitoring stations, toxic chemical releases from 21,000+ industrial facilities, drinking water safety records for 150,000+ public water systems, and greenhouse gas emissions from major sources.

This tutorial teaches you to build an environmental analytics pipeline in Microsoft Fabric that ingests EPA data across four major domains -- air quality, toxic releases, water compliance, and greenhouse gas emissions -- and produces dashboards revealing pollution patterns, facility risk profiles, environmental compliance trends, and environmental justice indicators. The pipeline includes a real-time AQI monitoring component using Fabric Eventhouse and KQL for live air quality tracking.

What distinguishes this tutorial from standard environmental analytics is the environmental justice angle: correlating pollution burden with demographic data to identify communities that bear disproportionate environmental risk. This analysis is central to the EPA's current strategic priorities and represents a genuinely advanced analytical pattern.

💡 Why EPA Analytics on Fabric?

  • Multi-domain integration: Air, water, toxic releases, and GHG data originate from different EPA offices and systems -- Fabric unifies them in a single governed lakehouse
  • Real-time + historical: EPA's AirNow API provides hourly AQI readings that can feed Eventhouse for real-time monitoring alongside decades of historical ambient air data
  • Environmental justice: Correlating EPA facility data with Census demographics requires cross-domain joins that Fabric's lakehouse architecture handles natively
  • Regulatory compliance: EPA data powers compliance reporting, permit tracking, and enforcement case analysis -- all requiring auditable data lineage via Purview
  • Geospatial richness: Every EPA data point is geo-referenced, enabling powerful map-based visualizations of pollution patterns

📋 EPA Data Access

All EPA data in this tutorial is publicly available. The AQS API requires a free registration at aqs.epa.gov. TRI, ECHO, and GHGRP data are bulk-downloadable from epa.gov/enviro and the EPA's Envirofacts API. No authentication is required for bulk downloads.


🎯 Learning Objectives

By the end of this tutorial, you will be able to:

  • Configure EPA data sources across air quality (AQS), toxic releases (TRI), water compliance (ECHO/SDWIS), and GHG emissions (GHGRP)
  • Ingest multi-domain environmental records into Bronze Delta tables with EPA registry identifiers and monitoring metadata
  • Standardize pollutant concentrations, unit-normalize measurements, and validate against EPA regulatory thresholds in Silver
  • Build Gold layer AQI calculations, facility risk profiles, water compliance scorecards, and environmental justice indicators
  • Implement real-time AQI monitoring using Fabric Eventhouse with KQL queries for exceedance detection
  • Create Power BI dashboards with air quality heat maps, TRI facility maps, water violation trends, and environmental health dashboards
  • Calculate environmental justice indices by correlating pollution data with Census demographic indicators
  • Apply data quality checks for environmental monitoring data (detection limits, method codes, completeness)
  • Configure EPA data standards and environmental compliance reporting governance in Microsoft Purview
  • Design alerting for regulatory threshold exceedances using Fabric Activator

🏗️ Architecture Diagram

%%{init: {'theme':'base', 'themeVariables': {'primaryColor':'#2E7D32','primaryTextColor':'#fff','primaryBorderColor':'#1B5E20','lineColor':'#43A047','secondaryColor':'#E8F5E9','tertiaryColor':'#fff'}}}%%
flowchart TB
    subgraph Sources["🌿 EPA Data Sources"]
        AQS["🌬️ AQS / AirNow\n(Air Quality)"]
        TRI["☢️ TRI\n(Toxic Release Inventory)"]
        ECHO["💧 ECHO / SDWIS\n(Water Compliance)"]
        GHGRP["🏭 GHGRP\n(GHG Emissions)"]
        EJ["👥 EJScreen\n(Env Justice)"]
    end

    subgraph Realtime["⚡ Real-Time Path"]
        ES["📡 Fabric Eventstream\n(AirNow Hourly)"]
        EH["🏠 Eventhouse\n(KQL Database)"]
        KQL["📊 KQL Queries\n(AQI Exceedance)"]
    end

    subgraph Fabric["🔷 Microsoft Fabric (Batch Path)"]
        direction TB

        subgraph Bronze["🥉 Bronze Layer"]
            B_AIR["bronze_epa_air_quality"]
            B_TRI["bronze_epa_tri_releases"]
            B_WATER["bronze_epa_water_compliance"]
            B_GHG["bronze_epa_ghg_emissions"]
        end

        subgraph Silver["🥈 Silver Layer"]
            S_AIR["silver_epa_air_quality\n(AQI Calculated)"]
            S_TRI["silver_epa_tri_releases\n(Normalized)"]
            S_WATER["silver_epa_water_compliance\n(Validated)"]
            S_GHG["silver_epa_ghg_emissions\n(Standardized)"]
        end

        subgraph Gold["🥇 Gold Layer"]
            G_AQI["gold_epa_aqi_index"]
            G_RISK["gold_epa_facility_risk"]
            G_WATER["gold_epa_water_scorecard"]
            G_EJ["gold_epa_env_justice"]
        end

        PURV["🏛️ Microsoft Purview\n(Lineage + Compliance)"]
    end

    subgraph Analytics["📊 Analytics Layer"]
        PBI["📊 Power BI\n(Direct Lake)"]
        AQ_MAP["🗺️ Air Quality\nHeat Map"]
        TRI_MAP["🏭 TRI Facility\nMap"]
        WATER_TR["💧 Water Violation\nTrends"]
        EJ_DASH["⚖️ Environmental\nHealth Dashboard"]
    end

    AQS --> ES
    AQS --> B_AIR
    ES --> EH
    EH --> KQL

    TRI --> B_TRI
    ECHO --> B_WATER
    GHGRP --> B_GHG

    B_AIR --> S_AIR
    B_TRI --> S_TRI
    B_WATER --> S_WATER
    B_GHG --> S_GHG

    S_AIR --> G_AQI
    S_TRI --> G_RISK
    S_WATER --> G_WATER
    S_AIR --> G_EJ
    S_TRI --> G_EJ
    EJ --> G_EJ

    PURV -.-> Bronze
    PURV -.-> Silver
    PURV -.-> Gold

    G_AQI --> PBI
    G_RISK --> PBI
    G_WATER --> PBI
    G_EJ --> PBI
    PBI --> AQ_MAP
    PBI --> TRI_MAP
    PBI --> WATER_TR
    PBI --> EJ_DASH

    KQL --> PBI

    style Sources fill:#E8F5E9
    style Realtime fill:#FFF3E0
    style Fabric fill:#E1F5FE
    style Bronze fill:#FFF8E1
    style Silver fill:#F3E5F5
    style Gold fill:#FCE4EC
    style Analytics fill:#EDE7F6
Component Technology Purpose
EPA Data Sources AQS, TRI, ECHO/SDWIS, GHGRP, EJScreen Multi-domain environmental data across air, water, chemicals, and emissions
Real-Time Path Eventstream + Eventhouse + KQL Live AQI monitoring with regulatory exceedance detection and alerting
Bronze Delta Lake (append-only) Raw monitoring records, release reports, compliance data with EPA registry IDs
Silver Delta Lake (validated) AQI-calculated air data, unit-normalized releases, validated compliance records
Gold Delta Lake (aggregated) AQI index tables, facility risk profiles, water scorecards, environmental justice metrics
Purview Microsoft Purview Data lineage, EPA data standards compliance, regulatory reporting governance
Power BI Direct Lake + KQL Air quality heat maps, TRI facility maps, water violation trends, environmental health dashboards

📋 Prerequisites

Before starting this tutorial, ensure you have:

📋 Data Source Options

This tutorial supports two data ingestion paths: 1. Synthetic Generator (recommended for learning): Use epa_generator.py to generate realistic EPA environmental data locally 2. EPA APIs + Bulk Downloads: Connect to real EPA data from AQS, Envirofacts, TRI, and ECHO for production analytics

Both paths converge at the same Bronze schema.


🛠️ Step 1: Data Source Setup

EPA operates dozens of environmental data systems. This tutorial integrates the five most analytically significant sources.

1.1 EPA Data Systems Overview

System Full Name Data Domain Volume Access
AQS Air Quality System Ambient air monitoring (PM2.5, O3, NO2, SO2, CO) 800M+ hourly records REST API (registered)
TRI Toxic Release Inventory Chemical releases from industrial facilities 6M+ release records (1987-present) Bulk CSV download
ECHO Enforcement & Compliance History Online Facility inspections, violations, enforcement 800K+ facilities REST API + bulk
SDWIS Safe Drinking Water Info System Public water system compliance 150K+ water systems Bulk download
GHGRP Greenhouse Gas Reporting Program Large-source GHG emissions 8K+ facilities, 3B+ tons CO2e Bulk download
EJScreen Environmental Justice Screening Tool Demographic + environmental indicators Census tract level Bulk download

1.2 Option A: Synthetic Data Generator

# Generate synthetic EPA environmental data for development
import sys
sys.path.append("../../data_generation")

from generators.federal.epa_generator import EPAGenerator

generator = EPAGenerator(
    output_path="/lakehouse/default/Files/raw/epa/",
    num_air_quality_records=100000,    # 100K air quality observations
    num_tri_records=10000,             # 10K toxic release records
    num_water_records=20000,           # 20K water compliance records
    num_ghg_records=5000,              # 5K GHG emission records
    date_range=("2018-01-01", "2024-12-31"),
    seed=42
)

datasets = generator.generate_batch()
for name, df in datasets.items():
    print(f"Generated {name}: {df.count():,} records")

1.3 Option B: EPA APIs and Bulk Downloads

# Connect to real EPA data sources
import requests

# 1. AQS API - Air Quality Data
AQS_EMAIL = "your.email@example.com"
AQS_KEY = "your_aqs_key"

def fetch_aqs_data(state_code: str, county_code: str, param: str,
                   begin_date: str, end_date: str) -> dict:
    """Fetch air quality data from EPA AQS API."""
    url = "https://aqs.epa.gov/data/api/dailyData/byCounty"
    params = {
        "email": AQS_EMAIL, "key": AQS_KEY,
        "param": param,          # 88101=PM2.5, 44201=Ozone
        "bdate": begin_date, "edate": end_date,
        "state": state_code, "county": county_code,
    }
    response = requests.get(url, params=params)
    return response.json()

# 2. TRI Bulk Download
TRI_URL = "https://enviro.epa.gov/triexplorer/release_fac?triession=TRIQQM5"
# Also: https://www.epa.gov/toxics-release-inventory-tri-program/tri-basic-data-files-calendar-years-1987-present

# 3. ECHO API - Compliance Data
def fetch_echo_facilities(state: str, program: str = "ALL") -> dict:
    """Fetch facility compliance data from ECHO."""
    url = "https://echo.epa.gov/cgi-bin/get_facilities"
    params = {
        "output": "JSON",
        "p_st": state,
        "p_med": program,
    }
    response = requests.get(url, params=params)
    return response.json()

# Example: Fetch PM2.5 data for Los Angeles County
aqs_data = fetch_aqs_data("06", "037", "88101", "20240101", "20240131")
print(f"AQS records: {len(aqs_data.get('Data', []))}")

1.4 EPA Data Policies

Policy Requirement
Public data All EPA environmental monitoring data is publicly available under FOIA
AQS registration AQS API requires free email/key registration; no cost or approval needed
TRI self-reported TRI data is self-reported by facilities; EPA verifies but does not guarantee accuracy
Attribution Credit "U.S. Environmental Protection Agency" in publications
Regulatory context EPA data is used for regulatory enforcement; handle with appropriate care
EJScreen disclaimer EJScreen is a screening tool, not a basis for regulatory decisions; note this in dashboards

🛠️ Step 2: Bronze Layer Ingestion

The Bronze layer captures raw environmental monitoring data from all four EPA domains with facility registry identifiers and measurement metadata.

📓 Notebook Reference: notebooks/bronze/15_bronze_epa.py (coming soon)

2.1 Air Quality Schema

# Schema for EPA air quality monitoring data (matches epa_air_quality_schema.json)
from pyspark.sql.types import *

air_quality_schema = StructType([
    StructField("monitor_id", StringType(), False),            # AQS site ID (state-county-site)
    StructField("site_name", StringType(), True),
    StructField("measurement_date", DateType(), False),
    StructField("measurement_time", StringType(), True),       # HH:MM local time
    StructField("parameter_code", StringType(), True),         # EPA AQS parameter code
    StructField("parameter_name", StringType(), True),         # PM2.5, Ozone, NO2, SO2, CO
    StructField("sample_duration", StringType(), True),        # 1 HOUR, 24 HOUR, etc.
    StructField("concentration", DoubleType(), True),          # Measured value
    StructField("units_of_measure", StringType(), True),       # ug/m3, ppb, ppm
    StructField("aqi_value", IntegerType(), True),             # AQI (0-500)
    StructField("aqi_category", StringType(), True),           # Good, Moderate, Unhealthy, etc.

    # Geography
    StructField("state_code", StringType(), True),
    StructField("county_code", StringType(), True),
    StructField("county_name", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("cbsa_code", StringType(), True),              # Core-Based Statistical Area
    StructField("cbsa_name", StringType(), True),

    # Quality
    StructField("method_code", StringType(), True),            # Measurement method
    StructField("data_validity", StringType(), True),          # Valid, Suspect, Excluded
    StructField("exceptional_event", StringType(), True),      # Wildfire, dust storm, etc.
])

print(f"Air quality schema: {len(air_quality_schema.fields)} fields")

2.2 Toxic Release Inventory Schema

# Schema for EPA TRI (Toxic Release Inventory)
tri_release_schema = StructType([
    StructField("tri_facility_id", StringType(), False),       # EPA TRI facility ID
    StructField("facility_name", StringType(), True),
    StructField("reporting_year", IntegerType(), True),
    StructField("chemical_name", StringType(), True),
    StructField("cas_number", StringType(), True),             # CAS Registry Number
    StructField("clean_air_act_chemical", BooleanType(), True),
    StructField("carcinogen", BooleanType(), True),

    # Release quantities (pounds)
    StructField("total_releases_lbs", DoubleType(), True),
    StructField("fugitive_air_lbs", DoubleType(), True),       # Air - non-point
    StructField("stack_air_lbs", DoubleType(), True),          # Air - stack/point
    StructField("water_releases_lbs", DoubleType(), True),
    StructField("land_releases_lbs", DoubleType(), True),
    StructField("underground_injection_lbs", DoubleType(), True),
    StructField("offsite_transfers_lbs", DoubleType(), True),

    # Facility details
    StructField("industry_sector", StringType(), True),
    StructField("naics_code", StringType(), True),
    StructField("street_address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("county", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("federal_facility", BooleanType(), True),
])

print(f"TRI schema: {len(tri_release_schema.fields)} fields")

2.3 Water Compliance Schema

# Schema for EPA water compliance (ECHO/SDWIS)
water_compliance_schema = StructType([
    StructField("pwsid", StringType(), False),                 # Public Water System ID
    StructField("pws_name", StringType(), True),
    StructField("violation_id", StringType(), True),
    StructField("violation_type", StringType(), True),         # MCL, TT, Monitoring, etc.
    StructField("contaminant_code", StringType(), True),
    StructField("contaminant_name", StringType(), True),
    StructField("compliance_period_begin", DateType(), True),
    StructField("compliance_period_end", DateType(), True),
    StructField("is_health_based", BooleanType(), True),       # Health-based violation
    StructField("enforcement_action", StringType(), True),

    # Water system details
    StructField("pws_type", StringType(), True),               # Community, NTNC, TNC
    StructField("population_served", IntegerType(), True),
    StructField("source_type", StringType(), True),            # Ground Water, Surface Water
    StructField("state_code", StringType(), True),
    StructField("county", StringType(), True),
    StructField("city_served", StringType(), True),
    StructField("epa_region", StringType(), True),
])

print(f"Water compliance schema: {len(water_compliance_schema.fields)} fields")

2.4 Bronze Ingestion

# Multi-domain EPA Bronze ingestion
from pyspark.sql.functions import *
from datetime import datetime
from uuid import uuid4

BATCH_ID = f"epa-batch-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}"
RUN_ID = str(uuid4())

source_path = "/lakehouse/default/Files/raw/epa/"

def add_bronze_metadata(df):
    return df \
        .withColumn("_ingested_at", current_timestamp()) \
        .withColumn("_source_file", input_file_name()) \
        .withColumn("_batch_id", lit(BATCH_ID)) \
        .withColumn("_run_id", lit(RUN_ID)) \
        .withColumn("_load_date", current_timestamp().cast("date"))

# Air quality
df_air = spark.read.schema(air_quality_schema).parquet(f"{source_path}air_quality/")
df_air_bronze = add_bronze_metadata(df_air)
df_air_bronze.write.format("delta").mode("append") \
    .partitionBy("state_code", "_load_date") \
    .saveAsTable("lh_bronze.bronze_epa_air_quality")

# TRI
df_tri = spark.read.schema(tri_release_schema).parquet(f"{source_path}tri/")
df_tri_bronze = add_bronze_metadata(df_tri)
df_tri_bronze.write.format("delta").mode("append") \
    .partitionBy("reporting_year") \
    .saveAsTable("lh_bronze.bronze_epa_tri_releases")

# Water compliance
df_water = spark.read.schema(water_compliance_schema).parquet(f"{source_path}water/")
df_water_bronze = add_bronze_metadata(df_water)
df_water_bronze.write.format("delta").mode("append") \
    .partitionBy("state_code") \
    .saveAsTable("lh_bronze.bronze_epa_water_compliance")

print(f"Bronze air quality: {df_air_bronze.count():,}")
print(f"Bronze TRI releases: {df_tri_bronze.count():,}")
print(f"Bronze water compliance: {df_water_bronze.count():,}")

🛠️ Step 3: Silver Layer -- Normalization & AQI Calculation

The Silver layer standardizes pollutant concentrations, calculates the Air Quality Index (AQI) from raw measurements, normalizes TRI release quantities, and validates water compliance records against regulatory thresholds.

📓 Notebook Reference: notebooks/silver/15_silver_epa.py (coming soon)

3.1 AQI Calculation from Raw Concentrations

The EPA Air Quality Index converts pollutant concentrations to a standardized 0-500 scale using breakpoint tables defined in 40 CFR Part 58.

# EPA AQI breakpoint calculation
# Reference: https://www.airnow.gov/aqi/aqi-basics/

AQI_BREAKPOINTS = {
    "PM2.5": [  # 24-hour average (ug/m3)
        (0.0, 12.0, 0, 50),          # Good
        (12.1, 35.4, 51, 100),       # Moderate
        (35.5, 55.4, 101, 150),      # Unhealthy for Sensitive Groups
        (55.5, 150.4, 151, 200),     # Unhealthy
        (150.5, 250.4, 201, 300),    # Very Unhealthy
        (250.5, 500.4, 301, 500),    # Hazardous
    ],
    "OZONE": [  # 8-hour average (ppm)
        (0.000, 0.054, 0, 50),
        (0.055, 0.070, 51, 100),
        (0.071, 0.085, 101, 150),
        (0.086, 0.105, 151, 200),
        (0.106, 0.200, 201, 300),
    ],
    "NO2": [  # 1-hour average (ppb)
        (0, 53, 0, 50),
        (54, 100, 51, 100),
        (101, 360, 101, 150),
        (361, 649, 151, 200),
        (650, 1249, 201, 300),
        (1250, 2049, 301, 500),
    ],
}

def calculate_aqi(concentration, breakpoints):
    """Calculate AQI from concentration using EPA breakpoint formula."""
    for c_lo, c_hi, i_lo, i_hi in breakpoints:
        if c_lo <= concentration <= c_hi:
            aqi = ((i_hi - i_lo) / (c_hi - c_lo)) * (concentration - c_lo) + i_lo
            return round(aqi)
    return None  # Out of range

# Apply AQI calculation as UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(IntegerType())
def aqi_udf(concentration, parameter):
    if concentration is None or parameter is None:
        return None
    bp = AQI_BREAKPOINTS.get(parameter.upper())
    if bp is None:
        return None
    return calculate_aqi(concentration, bp)

df_air_silver = df_air_bronze \
    .withColumn("aqi_calculated", aqi_udf(col("concentration"), col("parameter_name"))) \
    .withColumn("aqi_category_calculated",
        when(col("aqi_calculated") <= 50, lit("Good"))
        .when(col("aqi_calculated") <= 100, lit("Moderate"))
        .when(col("aqi_calculated") <= 150, lit("Unhealthy for Sensitive Groups"))
        .when(col("aqi_calculated") <= 200, lit("Unhealthy"))
        .when(col("aqi_calculated") <= 300, lit("Very Unhealthy"))
        .when(col("aqi_calculated") > 300, lit("Hazardous"))
        .otherwise(lit("Unknown"))
    )

3.2 TRI Release Normalization

# Normalize TRI release quantities and classify chemicals
df_tri_silver = df_tri_bronze \
    .withColumn("total_air_releases_lbs",
        coalesce(col("fugitive_air_lbs"), lit(0)) + coalesce(col("stack_air_lbs"), lit(0))
    ) \
    .withColumn("total_onsite_releases_lbs",
        coalesce(col("total_air_releases_lbs"), lit(0)) +
        coalesce(col("water_releases_lbs"), lit(0)) +
        coalesce(col("land_releases_lbs"), lit(0))
    ) \
    .withColumn("total_releases_tons", round(col("total_releases_lbs") / 2000, 2)) \
    .withColumn("state_std", upper(trim(col("state")))) \
    .withColumn("release_tier",
        when(col("total_releases_lbs") >= 1000000, lit("Major (>1M lbs)"))
        .when(col("total_releases_lbs") >= 100000, lit("Large (100K-1M lbs)"))
        .when(col("total_releases_lbs") >= 10000, lit("Medium (10K-100K lbs)"))
        .when(col("total_releases_lbs") >= 1000, lit("Small (1K-10K lbs)"))
        .otherwise(lit("Minor (<1K lbs)"))
    ) \
    .withColumn("chemical_category",
        when(col("carcinogen") == True, lit("Carcinogen"))
        .when(col("clean_air_act_chemical") == True, lit("Clean Air Act"))
        .otherwise(lit("Other Toxic"))
    )

# Deduplication
df_tri_deduped = df_tri_silver.dropDuplicates(["tri_facility_id", "reporting_year", "chemical_name"])

3.3 Water Compliance Validation

# Validate and enrich water compliance records
df_water_silver = df_water_bronze \
    .withColumn("state_std", upper(trim(col("state_code")))) \
    .withColumn("violation_severity",
        when(col("is_health_based") == True, lit("Health-Based"))
        .when(col("violation_type").isin("MCL", "TT"), lit("Regulatory"))
        .when(col("violation_type") == "Monitoring", lit("Monitoring"))
        .otherwise(lit("Administrative"))
    ) \
    .withColumn("population_impact_tier",
        when(col("population_served") >= 100000, lit("Large System (100K+)"))
        .when(col("population_served") >= 10000, lit("Medium System (10K-100K)"))
        .when(col("population_served") >= 1000, lit("Small System (1K-10K)"))
        .otherwise(lit("Very Small (<1K)"))
    ) \
    .withColumn("violation_duration_days",
        when(col("compliance_period_end").isNotNull(),
            datediff(col("compliance_period_end"), col("compliance_period_begin"))
        ).otherwise(lit(None))
    )

# Deduplication
df_water_deduped = df_water_silver.dropDuplicates(["violation_id"])

3.4 Data Quality and Write Silver

# Air quality DQ scoring
df_air_silver = df_air_silver \
    .withColumn("_dq_score",
        when(col("monitor_id").isNotNull(), lit(15)).otherwise(lit(0)) +
        when(col("measurement_date").isNotNull(), lit(15)).otherwise(lit(0)) +
        when(col("concentration").isNotNull() & (col("concentration") >= 0), lit(15)).otherwise(lit(0)) +
        when(col("aqi_calculated").isNotNull(), lit(15)).otherwise(lit(0)) +
        when(col("parameter_name").isNotNull(), lit(10)).otherwise(lit(0)) +
        when(col("data_validity") == "Valid", lit(10)).otherwise(lit(0)) +
        when(col("latitude").isNotNull() & col("longitude").isNotNull(), lit(10)).otherwise(lit(0)) +
        when(col("state_code").isNotNull(), lit(5)).otherwise(lit(0)) +
        when(col("method_code").isNotNull(), lit(5)).otherwise(lit(0))
    ) \
    .withColumn("_silver_timestamp", current_timestamp())

# Write Silver tables
df_air_silver.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("state_code") \
    .saveAsTable("lh_silver.silver_epa_air_quality")

df_tri_deduped.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("reporting_year") \
    .saveAsTable("lh_silver.silver_epa_tri_releases")

df_water_deduped.write.format("delta").mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("state_std") \
    .saveAsTable("lh_silver.silver_epa_water_compliance")

spark.sql("OPTIMIZE lh_silver.silver_epa_air_quality ZORDER BY (monitor_id, measurement_date)")
spark.sql("OPTIMIZE lh_silver.silver_epa_tri_releases ZORDER BY (state_std, chemical_name)")
spark.sql("OPTIMIZE lh_silver.silver_epa_water_compliance ZORDER BY (state_std, violation_severity)")

print(f"Silver air quality: {df_air_silver.count():,}")
print(f"Silver TRI releases: {df_tri_deduped.count():,}")
print(f"Silver water compliance: {df_water_deduped.count():,}")

🛠️ Step 4: Real-Time AQI Monitoring

Fabric Eventhouse enables continuous monitoring of air quality with threshold-based alerting for regulatory exceedances.

4.1 Eventstream Configuration for AirNow

# Eventstream ingestion: AirNow API -> Eventhouse for real-time AQI
import json
import requests
import time
from datetime import datetime

AIRNOW_API_KEY = os.environ.get("AIRNOW_API_KEY", "YOUR_KEY_HERE")

# Monitor AQI for major metro areas
MONITORED_ZIPS = [
    "10001", "90001", "60601", "77001", "85001",
    "30301", "80201", "94101", "33101", "98101",
]

def fetch_current_aqi(zip_code: str) -> list:
    """Fetch current AQI from AirNow for a ZIP code."""
    url = "https://www.airnowapi.org/aq/observation/zipCode/current/"
    params = {
        "format": "application/json",
        "zipCode": zip_code,
        "distance": 25,
        "API_KEY": AIRNOW_API_KEY,
    }
    response = requests.get(url, params=params, timeout=10)
    return response.json() if response.status_code == 200 else []

# Polling loop for Eventstream
while True:
    for zip_code in MONITORED_ZIPS:
        observations = fetch_current_aqi(zip_code)
        for obs in observations:
            event = {
                "zip_code": zip_code,
                "parameter": obs.get("ParameterName"),
                "aqi": obs.get("AQI"),
                "category": obs.get("Category", {}).get("Name"),
                "reporting_area": obs.get("ReportingArea"),
                "state": obs.get("StateCode"),
                "latitude": obs.get("Latitude"),
                "longitude": obs.get("Longitude"),
                "event_time": datetime.utcnow().isoformat(),
            }
            emit_event(json.dumps(event))
    time.sleep(3600)  # Hourly polling (AirNow updates hourly)

4.2 KQL Queries for AQI Monitoring

// Real-time AQI monitoring queries in Eventhouse

// 1. Current AQI by monitoring area
epa_aqi_stream
| summarize arg_max(event_time, *) by reporting_area, parameter
| project reporting_area, state, parameter, aqi, category
| order by aqi desc

// 2. AQI exceedance detection (above 100 = Unhealthy for Sensitive Groups)
epa_aqi_stream
| where event_time > ago(6h)
| where aqi > 100
| project event_time, reporting_area, state, parameter, aqi, category
| order by aqi desc

// 3. Hazardous air quality alerts (AQI > 300)
epa_aqi_stream
| where event_time > ago(24h)
| where aqi > 300
| summarize alert_count = count(), max_aqi = max(aqi),
            first_alert = min(event_time), last_alert = max(event_time)
  by reporting_area, state, parameter
| order by max_aqi desc

// 4. AQI trend over past 24 hours
epa_aqi_stream
| where event_time > ago(24h)
| where parameter == "PM2.5"
| summarize avg_aqi = avg(aqi) by bin(event_time, 1h), reporting_area
| order by event_time asc
| render timechart

🛠️ Step 5: Gold Layer -- Environmental Analytics

The Gold layer builds decision-ready aggregations: AQI indices by geography, facility risk profiles, water compliance scorecards, and environmental justice indicators.

📓 Notebook Reference: notebooks/gold/15_gold_epa_analytics.py (coming soon)

5.1 AQI Index by Geography

# AQI summary by state and monitoring site
df_aqi_index = df_air_silver \
    .filter(col("data_validity") == "Valid") \
    .groupBy("state_code", "county_name", "cbsa_name", "parameter_name") \
    .agg(
        avg("aqi_calculated").alias("avg_aqi"),
        max("aqi_calculated").alias("max_aqi"),
        min("aqi_calculated").alias("min_aqi"),
        percentile_approx("aqi_calculated", 0.95).alias("p95_aqi"),
        count("*").alias("observation_count"),

        # Days in each AQI category
        sum(when(col("aqi_category_calculated") == "Good", 1).otherwise(0)).alias("days_good"),
        sum(when(col("aqi_category_calculated") == "Moderate", 1).otherwise(0)).alias("days_moderate"),
        sum(when(col("aqi_category_calculated").contains("Unhealthy"), 1).otherwise(0)).alias("days_unhealthy"),
        sum(when(col("aqi_category_calculated") == "Hazardous", 1).otherwise(0)).alias("days_hazardous"),

        # Exceptional events
        sum(when(col("exceptional_event").isNotNull(), 1).otherwise(0)).alias("exceptional_event_days"),
    ) \
    .withColumn("unhealthy_pct",
        round((col("days_unhealthy") + col("days_hazardous")) * 100.0 / col("observation_count"), 2)
    )

df_aqi_index.write.format("delta").mode("overwrite") \
    .saveAsTable("lh_gold.gold_epa_aqi_index")

5.2 Facility Risk Profiles (TRI)

# TRI facility risk profiles
df_facility_risk = df_tri_deduped \
    .groupBy("tri_facility_id", "facility_name", "state_std", "county",
             "industry_sector", "latitude", "longitude") \
    .agg(
        count("*").alias("reporting_years"),
        sum("total_releases_lbs").alias("lifetime_releases_lbs"),
        sum("total_air_releases_lbs").alias("lifetime_air_releases_lbs"),
        sum("water_releases_lbs").alias("lifetime_water_releases_lbs"),
        avg("total_releases_lbs").alias("avg_annual_releases_lbs"),
        countDistinct("chemical_name").alias("distinct_chemicals"),
        sum(when(col("carcinogen") == True, col("total_releases_lbs")).otherwise(0))
            .alias("carcinogen_releases_lbs"),
        max("reporting_year").alias("most_recent_report"),
    ) \
    .withColumn("risk_tier",
        when(col("carcinogen_releases_lbs") > 100000, lit("Critical"))
        .when(col("lifetime_releases_lbs") > 1000000, lit("High"))
        .when(col("lifetime_releases_lbs") > 100000, lit("Elevated"))
        .when(col("lifetime_releases_lbs") > 10000, lit("Moderate"))
        .otherwise(lit("Low"))
    )

df_facility_risk.write.format("delta").mode("overwrite") \
    .saveAsTable("lh_gold.gold_epa_facility_risk")

spark.sql("OPTIMIZE lh_gold.gold_epa_facility_risk ZORDER BY (state_std, risk_tier)")

5.3 Water Compliance Scorecards

# Water system compliance scorecards
df_water_scorecard = df_water_deduped \
    .groupBy("state_std", "population_impact_tier") \
    .agg(
        countDistinct("pwsid").alias("total_systems"),
        count("*").alias("total_violations"),
        sum(when(col("is_health_based") == True, 1).otherwise(0)).alias("health_violations"),
        sum(when(col("violation_type") == "MCL", 1).otherwise(0)).alias("mcl_violations"),
        sum(when(col("violation_type") == "Monitoring", 1).otherwise(0)).alias("monitoring_violations"),
        sum(when(col("enforcement_action").isNotNull(), 1).otherwise(0)).alias("enforcement_actions"),
        sum("population_served").alias("total_population_affected"),
        avg("violation_duration_days").alias("avg_violation_duration_days"),
    ) \
    .withColumn("violations_per_system",
        round(col("total_violations") * 1.0 / col("total_systems"), 2)
    ) \
    .withColumn("health_violation_rate",
        round(col("health_violations") * 100.0 / col("total_violations"), 2)
    )

df_water_scorecard.write.format("delta").mode("overwrite") \
    .saveAsTable("lh_gold.gold_epa_water_scorecard")

5.4 Environmental Justice Indicators

# Environmental justice: correlate pollution burden with demographics
# Uses EPA EJScreen methodology conceptually (actual EJScreen data would be joined here)

df_ej = df_tri_deduped \
    .groupBy("state_std", "county") \
    .agg(
        count("*").alias("tri_facilities"),
        sum("total_releases_lbs").alias("total_releases_lbs"),
        sum("carcinogen_releases_lbs").alias("carcinogen_releases_lbs"),
    )

# Join with AQI data for multi-domain environmental burden
df_aqi_county = df_air_silver \
    .groupBy("state_code", "county_name") \
    .agg(
        avg("aqi_calculated").alias("avg_aqi"),
        sum(when(col("aqi_category_calculated").contains("Unhealthy"), 1).otherwise(0))
            .alias("unhealthy_air_days"),
    )

df_water_county = df_water_deduped \
    .groupBy("state_std", "county") \
    .agg(
        sum(when(col("is_health_based") == True, 1).otherwise(0)).alias("health_water_violations"),
        sum("population_served").alias("affected_population"),
    )

# Composite environmental burden score
df_env_justice = df_ej \
    .join(df_aqi_county, (df_ej.state_std == df_aqi_county.state_code) &
          (df_ej.county == df_aqi_county.county_name), "left") \
    .join(df_water_county, ["state_std", "county"], "left") \
    .fillna(0) \
    .withColumn("env_burden_index",
        round(
            (col("tri_facilities") * 20 +
             coalesce(col("unhealthy_air_days"), lit(0)) * 30 +
             coalesce(col("health_water_violations"), lit(0)) * 25 +
             when(col("carcinogen_releases_lbs") > 0, lit(25)).otherwise(lit(0))),
            1
        )
    ) \
    .withColumn("env_burden_percentile",
        percent_rank().over(Window.orderBy(col("env_burden_index")))
    ) \
    .withColumn("ej_priority",
        when(col("env_burden_percentile") >= 0.80, lit("High Priority"))
        .when(col("env_burden_percentile") >= 0.50, lit("Moderate"))
        .otherwise(lit("Lower Burden"))
    )

df_env_justice.write.format("delta").mode("overwrite") \
    .saveAsTable("lh_gold.gold_epa_env_justice")

# Summary
for table in ["gold_epa_aqi_index", "gold_epa_facility_risk",
              "gold_epa_water_scorecard", "gold_epa_env_justice"]:
    count = spark.table(f"lh_gold.{table}").count()
    print(f"  {table}: {count:,} rows")

🛠️ Step 6: Power BI Dashboard

6.1 DAX Measures

// ===== EPA Environmental Measures =====

// Average AQI
Average AQI =
AVERAGE(gold_epa_aqi_index[avg_aqi])

// Unhealthy Air Days (%)
Unhealthy Air Days % =
DIVIDE(
    SUM(gold_epa_aqi_index[days_unhealthy]) + SUM(gold_epa_aqi_index[days_hazardous]),
    SUM(gold_epa_aqi_index[observation_count]),
    0
) * 100

// Total Toxic Releases (tons)
Total Toxic Releases Tons =
SUM(gold_epa_facility_risk[lifetime_releases_lbs]) / 2000

// Carcinogen Release Share (%)
Carcinogen Share % =
DIVIDE(
    SUM(gold_epa_facility_risk[carcinogen_releases_lbs]),
    SUM(gold_epa_facility_risk[lifetime_releases_lbs]),
    0
) * 100

// Critical Risk Facilities
Critical Risk Facilities =
CALCULATE(
    COUNTROWS(gold_epa_facility_risk),
    gold_epa_facility_risk[risk_tier] IN {"Critical", "High"}
)

// Water Health Violations
Total Health Violations =
SUM(gold_epa_water_scorecard[health_violations])

// Population Affected by Water Violations
Population Affected =
SUM(gold_epa_water_scorecard[total_population_affected])

// EJ High Priority Counties
EJ High Priority Counties =
CALCULATE(
    COUNTROWS(gold_epa_env_justice),
    gold_epa_env_justice[ej_priority] = "High Priority"
)

// AQI Trend (Year-over-Year)
AQI YoY Change =
VAR CurrentAvg = [Average AQI]
VAR PriorAvg =
    CALCULATE([Average AQI], DATEADD(gold_epa_aqi_index[measurement_date], -1, YEAR))
RETURN CurrentAvg - PriorAvg

6.2 Dashboard Layout

┌─────────────────────────────────────────────────────────────────────────────┐
│  🌿 EPA ENVIRONMENTAL ANALYTICS DASHBOARD                                   │
│  State: [Slicer] │ Pollutant: [Slicer] │ Year: [Slicer] │ Domain: [Slic] │
├───────────────────┬──────────────────┬──────────────────┬──────────────────┤
│  🌬️ Average AQI  │  ☢️ Toxic Release │  💧 Water Health │  ⚖️ EJ Priority │
│                   │  Facilities       │  Violations       │  Counties       │
│  42 (Good)        │  847 Critical     │  12,438           │  623            │
├───────────────────┴──────────────────┴──────────────────┴──────────────────┤
│                                                                             │
│  ┌─────────────────────────────────┐  ┌──────────────────────────────────┐ │
│  │  Air Quality Heat Map           │  │  TRI Facility Risk Map          │ │
│  │  [Filled Map: County-level AQI]│  │  [Bubble Map: Lat/Long]         │ │
│  │                                 │  │                                  │ │
│  │  Color: Average AQI             │  │  Size: Total releases           │ │
│  │  Tooltip: Days unhealthy, P95  │  │  Color: Risk tier               │ │
│  │  Drill: State -> County        │  │  Tooltip: Chemicals, carcinogens│ │
│  └─────────────────────────────────┘  └──────────────────────────────────┘ │
│                                                                             │
│  ┌─────────────────────────────────┐  ┌──────────────────────────────────┐ │
│  │  Water Violation Trends         │  │  Environmental Justice Index    │ │
│  │  [Stacked Bar: Year x Type]    │  │  [Scatter: EJ Index x Demog]   │ │
│  │                                 │  │                                  │ │
│  │  Health-Based ████ 28%         │  │  X: Environmental burden score  │ │
│  │  MCL ██████ 35%                │  │  Y: Demographic vulnerability   │ │
│  │  Monitoring ████████ 37%       │  │  Color: EJ priority             │ │
│  └─────────────────────────────────┘  └──────────────────────────────────┘ │
│                                                                             │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │  Environmental Health Scorecard by State                             │  │
│  │  [Matrix: State x (AQI, TRI Facilities, Water Violations, EJ Score)]│  │
│  │  [Conditional formatting: Red=bad, Green=good]                      │  │
│  └──────────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

🛠️ Step 7: Data Quality & Governance

7.1 Environmental Data Quality Checks

# Data quality checks specific to EPA environmental data
from great_expectations.core import ExpectationSuite

epa_expectations = ExpectationSuite(expectation_suite_name="epa_env_suite")

# AQI range (0-500)
epa_expectations.add_expectation({
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {"column": "aqi_calculated", "min_value": 0, "max_value": 500}
})

# PM2.5 concentration non-negative
epa_expectations.add_expectation({
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {"column": "concentration", "min_value": 0, "mostly": 0.999}
})

# TRI releases non-negative
epa_expectations.add_expectation({
    "expectation_type": "expect_column_values_to_be_between",
    "kwargs": {"column": "total_releases_lbs", "min_value": 0, "mostly": 0.999}
})

# State code format
epa_expectations.add_expectation({
    "expectation_type": "expect_column_value_lengths_to_equal",
    "kwargs": {"column": "state_code", "value": 2}
})

# CAS number format (XXXXXX-XX-X pattern)
epa_expectations.add_expectation({
    "expectation_type": "expect_column_values_to_match_regex",
    "kwargs": {"column": "cas_number", "regex": r"^\d{2,7}-\d{2}-\d$", "mostly": 0.95}
})

print(f"EPA validation suite: {len(epa_expectations.expectations)} expectations")

7.2 EPA Governance Configuration

# Purview configuration for EPA data
purview_config = {
    "lh_bronze.bronze_epa_air_quality": {
        "sensitivity_label": "Public - Federal Environmental Data",
        "attribution": "U.S. Environmental Protection Agency - Air Quality System (AQS)",
        "regulatory_context": "40 CFR Part 58 - Ambient Air Quality Surveillance",
        "update_frequency": "Hourly (real-time) + daily batch",
    },
    "lh_bronze.bronze_epa_tri_releases": {
        "sensitivity_label": "Public - Federal Environmental Data",
        "attribution": "U.S. EPA - Toxic Release Inventory (TRI)",
        "regulatory_context": "EPCRA Section 313 - Toxic Chemical Release Reporting",
        "note": "Self-reported by facilities; EPA-verified but not guaranteed",
    },
    "lh_gold.gold_epa_env_justice": {
        "sensitivity_label": "Internal - Derived Analytics",
        "disclaimer": "Environmental justice index is a derived composite metric, not an official EPA product",
        "methodology": "Composite of TRI proximity, AQI, water violations (EPA EJScreen-inspired)",
    },
}

for table, config in purview_config.items():
    print(f"\n{table}:")
    for key, value in config.items():
        print(f"  {key}: {value}")

🔧 Troubleshooting

Symptom Likely Cause Solution
AQI calculation returns NULL Concentration value outside breakpoint table range Check for negative values or extreme outliers; verify parameter_name matches breakpoint keys
TRI facility coordinates are (0,0) Missing or invalid lat/long in TRI submission Filter facilities with NULL or zero coordinates; geocode from address using Azure Maps
AQS API returns 429 (rate limited) Exceeding 5 requests/second or 10K/day limit Implement exponential backoff; batch requests by county rather than individual monitors
Water violation counts seem too high Counting sub-violations or multiple compliance periods Deduplicate by violation_id; group by water system for per-system counts
Exceptional events skew AQI averages Wildfire smoke, dust storms inflate readings Filter on exceptional_event IS NULL for trend analysis; track exceptional days separately
TRI release quantities in different units Some chemicals reported in grams, others in pounds Check units field in raw TRI data; convert all to pounds in Silver
EJ index appears arbitrary Composite weights need calibration Calibrate against official EPA EJScreen percentiles; document weight rationale
Eventhouse data gaps AirNow API intermittent availability Add retry logic; track data freshness as a KQL metric
Power BI map shows wrong locations Lat/long swapped or in wrong projection Verify coordinate order; EPA data uses NAD83/WGS84 datum
GHG emissions data missing sectors GHGRP only covers facilities above 25K MT CO2e/year Document GHGRP reporting threshold; note that small emitters are excluded

📋 Best Practices

  1. Always check data validity flags. EPA air quality data includes data_validity flags (Valid, Suspect, Excluded). Filter to "Valid" observations for regulatory analysis. Include suspect data only for research or gap-filling purposes.

  2. Distinguish health-based from monitoring violations. Water compliance data contains both health-based violations (direct risk to public health) and monitoring violations (failure to test). Always separate these in analyses -- a monitoring violation indicates a data gap, not necessarily contamination.

  3. Handle exceptional events separately. Wildfire smoke, volcanic ash, and dust storms can drive AQI to 500+ in otherwise clean-air regions. Track exceptional events as a separate metric rather than blending them into trend analysis.

  4. Respect TRI self-reporting limitations. TRI data is self-reported by facilities under EPCRA Section 313. The EPA verifies submissions but does not independently measure releases. Build dashboards that communicate this limitation. Year-over-year changes may reflect reporting methodology changes, not actual emission changes.

  5. Use CAS numbers as the chemical identifier. Chemical names vary across facilities (abbreviations, trade names, etc.). The CAS Registry Number is the universal chemical identifier and should be the join key for chemical-level analysis.

  6. Calibrate environmental justice indices carefully. Composite EJ indices are powerful but subjective. Document the weights and methodology. Compare your results against the official EPA EJScreen tool to validate reasonableness.

  7. Partition air data by state, Z-Order by monitor and date. Air quality queries almost always filter by geography then time. This combination optimizes Direct Lake performance for map-based and trend-based visualizations.

  8. Configure Activator alerts for regulatory thresholds. AQI > 100 triggers "Unhealthy for Sensitive Groups" -- a meaningful threshold for public health notifications. Set up Fabric Activator to alert when real-time AQI crosses this boundary.


✅ Summary

Congratulations! You have built a comprehensive EPA environmental analytics pipeline in Microsoft Fabric spanning air quality, toxic releases, water compliance, and environmental justice.

What You Accomplished

  • Configured multi-domain EPA data ingestion from AQS, TRI, ECHO/SDWIS, and GHGRP sources (synthetic and real data paths)
  • Ingested environmental monitoring records into Bronze Delta tables with EPA registry identifiers and quality flags
  • Calculated Air Quality Index (AQI) from raw pollutant concentrations using EPA's official breakpoint formula in Silver
  • Normalized TRI toxic release quantities, classified chemical hazards, and validated water compliance records
  • Implemented real-time AQI monitoring using Fabric Eventhouse with KQL exceedance detection and Activator alerting
  • Built Gold layer AQI summaries, TRI facility risk profiles, water compliance scorecards, and environmental justice indicators
  • Developed DAX measures for Power BI dashboards with air quality heat maps, facility risk maps, and environmental health scorecards
  • Applied environmental data quality checks for detection limits, method codes, and regulatory thresholds
  • Configured EPA data attribution and environmental compliance governance in Microsoft Purview

Key Takeaways

Concept Key Point
Multi-Domain Integration Air, water, toxic releases, and emissions data from different EPA systems unified in a single governed lakehouse
AQI Calculation EPA's official breakpoint formula converts raw concentrations to a 0-500 index that drives public health decisions
Environmental Justice Correlating pollution burden with demographics reveals communities bearing disproportionate environmental risk
Facility Risk Profiling TRI data enables tracking chemical releases by facility over time, identifying persistent high-risk sources
Real-Time Monitoring Eventhouse + KQL enables production-grade AQI monitoring with threshold-based alerting
Regulatory Context EPA data powers compliance reporting; Purview governance ensures audit-ready lineage and attribution

🚀 Next Steps

Continue your learning journey:

Next Tutorial: Tutorial 36: DOI Natural Resources Analytics -- Build natural resource management pipelines with USGS earthquake data, NPS park visitation, water monitoring, and endangered species tracking.

Related Tutorials: - Tutorial 34: NOAA Weather & Climate Analytics -- Weather data directly impacts air quality; combine NOAA and EPA data for atmospheric analysis - Tutorial 04: Real-Time Analytics -- Foundational Eventstream and Eventhouse patterns used in real-time AQI monitoring - Tutorial 21: GeoAnalytics & ArcGIS -- Advanced geospatial visualization for facility locations and pollution patterns - Tutorial 07: Governance & Purview -- Data lineage and regulatory compliance governance for environmental data


📚 Resources

Resource Link
EPA AQS API aqs.epa.gov
AirNow API airnowapi.org
Toxic Release Inventory epa.gov/tri
ECHO Compliance echo.epa.gov
EJScreen epa.gov/ejscreen
GHGRP epa.gov/ghgreporting
Air Quality Schema data_generation/schemas/federal/epa_air_quality_schema.json
Water Quality Schema data_generation/schemas/federal/epa_water_quality_schema.json
Data Generator data_generation/generators/federal/epa_generator.py

Previous Up Next
⬅️ 34-NOAA Weather & Climate Analytics 📖 Tutorials Index 36-DOI Natural Resources Analytics ➡️

Questions or issues? Open an issue in the GitHub repository

Tutorial 35 of 36 in the Microsoft Fabric Casino POC Series