Skip to content

Source: examples/epa/README.md — this page is rendered live from that file.

CIPSEA awareness

The data in this example may be subject to CIPSEA (the Confidential Information Protection and Statistical Efficiency Act, 44 U.S.C. §§ 3561–3583) when collected from respondents under a pledge of confidentiality for exclusively statistical purposes.

Knowing and willful disclosure of identifiable CIPSEA data is a Class E felony (§ 3572) attaching to individual officers, employees, or designated agents — including cloud-operator personnel where applicable.

The architecture below is starting-point reference guidance only. Validate the specific compliance posture for your workload with your designating statistical agency and Confidentiality Officer before production use:

EPA Environmental Monitoring Analytics Platform

Examples > EPA

[!TIP] TL;DR — Environmental monitoring platform with real-time AQI streaming from 4,000+ stations, water safety tracking for 25,000+ systems, toxic release analysis, and environmental justice scoring with ML-based air quality prediction.


📋 Table of Contents

A comprehensive environmental monitoring analytics platform built on Azure Cloud Scale Analytics (CSA), providing insights into air quality, water safety, toxic releases, and environmental justice using official EPA data sources — including real-time AQI sensor streaming for near-real-time air quality dashboards.


📋 Overview

The Environmental Protection Agency monitors air quality at 4,000+ stations, tracks 25,000+ drinking water systems, catalogs toxic releases from 20,000+ facilities, and manages 1,300+ Superfund sites. This platform ingests, processes, and analyzes EPA data to enable air quality prediction, environmental justice analysis, and emissions compliance monitoring. The streaming pipeline demonstrates real-time AQI sensor data flowing through Azure Event Hub for sub-minute air quality alerting.

✨ Key Features

  • Real-Time Air Quality Monitoring: Live AQI data via Event Hub with ML-based prediction
  • Drinking Water Safety: SDWIS violation tracking with risk-based prioritization
  • Toxic Release Tracking: TRI facility emissions with trend analysis and community impact
  • Superfund Site Management: Cleanup progress tracking and community exposure assessment
  • Environmental Justice Analysis: Overlay pollution data with socioeconomic indicators
  • Regulatory Compliance Dashboards: Facility-level NESHAP, NPDES, and RCRA compliance

🗄️ Data Sources

Source Description URL
AirNow API Real-time and forecast AQI for 500+ metro areas https://docs.airnowapi.org/
AQS (Air Quality System) Historical ambient air quality data from monitors https://aqs.epa.gov/aqsweb/documents/data_api.html
SDWIS Safe Drinking Water Information System — violations & compliance https://www.epa.gov/enviro/sdwis-search
TRI (Toxics Release Inventory) Chemical releases from industrial facilities https://www.epa.gov/toxics-release-inventory-tri-program/tri-data-and-tools
ECHO Enforcement and Compliance History Online https://echo.epa.gov/tools/web-services
Envirofacts API Multi-program environmental data gateway https://www.epa.gov/enviro/envirofacts-data-service-api
EJScreen Environmental justice screening and mapping https://www.epa.gov/ejscreen
Superfund / CERCLIS NPL site locations, contaminants, and cleanup status https://www.epa.gov/superfund/superfund-data-and-reports

🏗️ Architecture Overview

graph TD
    subgraph "Data Sources"
        A1[AirNow API<br/>Real-Time AQI]
        A2[AQS API<br/>Historical Air Quality]
        A3[SDWIS<br/>Water System Violations]
        A4[TRI<br/>Toxic Releases]
        A5[ECHO<br/>Compliance History]
        A6[EJScreen<br/>Environmental Justice]
        A7[Superfund<br/>NPL Sites]
    end

    subgraph "Streaming Pipeline"
        SEN[AQI Sensor<br/>Network]
        EH[Azure Event Hub<br/>aqi-sensor-events]
        ADX[Azure Data Explorer<br/>Hot Analytics]
        ALERT[Alert Engine<br/>AQI Threshold Triggers]
        RT[Real-Time AQI<br/>Dashboard]
    end

    subgraph "Batch Ingestion"
        I1[ADF Pipeline<br/>Scheduled Loads]
        I2[REST Connectors<br/>API Polling]
    end

    subgraph "Bronze Layer — Raw"
        B1[brz_aqi_observations]
        B2[brz_water_violations]
        B3[brz_toxic_releases]
        B4[brz_compliance_records]
        B5[brz_ejscreen_indicators]
        B6[brz_superfund_sites]
    end

    subgraph "Silver Layer — Cleansed"
        S1[slv_air_quality]
        S2[slv_water_safety]
        S3[slv_facility_emissions]
        S4[slv_compliance_status]
        S5[slv_ej_demographics]
    end

    subgraph "Gold Layer — Analytics"
        G1[gld_aqi_prediction]
        G2[gld_ej_analysis]
        G3[gld_emissions_compliance]
        G4[gld_water_risk_index]
        G5[gld_environmental_dashboard]
    end

    subgraph "Consumption"
        C1[AQI Dashboard]
        C2[EJ Mapping Tool]
        C3[Compliance Reports]
        C4[Public Health APIs]
    end

    SEN --> EH
    EH --> ADX
    ADX --> ALERT
    ADX --> RT

    A1 --> I2
    A2 --> I1
    A3 --> I1
    A4 --> I1
    A5 --> I2
    A6 --> I1
    A7 --> I1

    I1 --> B1
    I1 --> B2
    I1 --> B3
    I2 --> B4
    I1 --> B5
    I1 --> B6

    B1 --> S1
    B2 --> S2
    B3 --> S3
    B4 --> S4
    B5 --> S5

    S1 --> G1
    S1 --> G2
    S3 --> G2
    S5 --> G2
    S3 --> G3
    S4 --> G3
    S2 --> G4
    S1 --> G5
    S2 --> G5
    S3 --> G5

    G1 --> C1
    G2 --> C2
    G3 --> C3
    G5 --> C4

⚡ Real-Time AQI Streaming Architecture

This example includes a streaming pipeline for near-real-time air quality index monitoring:

graph LR
    subgraph "Sensor Network"
        S1[PM2.5 Monitor<br/>Site 060371103]
        S2[Ozone Monitor<br/>Site 170314201]
        S3[NO2 Monitor<br/>Site 360610079]
    end

    subgraph "Ingestion"
        SIM[AQI Sensor Simulator<br/>Python Script]
        EH[Azure Event Hub<br/>aqi-sensor-events]
    end

    subgraph "Hot Path — Real-Time"
        ADX[Azure Data Explorer]
        KQL[KQL Continuous Queries]
        ALERT[Azure Monitor Alerts<br/>AQI > 150 Trigger]
        DASH[Power BI Real-Time<br/>AQI Heatmap]
    end

    subgraph "Warm Path — Batch"
        SA[Stream Analytics<br/>5-min Aggregations]
        ADLS[ADLS Gen2<br/>Bronze Parquet]
    end

    S1 --> SIM
    S2 --> SIM
    S3 --> SIM
    SIM --> EH
    EH --> ADX
    ADX --> KQL
    KQL --> ALERT
    KQL --> DASH
    EH --> SA
    SA --> ADLS

🚀 Streaming Quick Start

# Start the AQI sensor simulator
python streaming/aqi_sensor_simulator.py \
  --event-hub-connection "$EVENTHUB_CONNECTION_STRING" \
  --sites "060371103,170314201,360610079" \
  --pollutants "PM2.5,O3,NO2,CO" \
  --interval-seconds 60

# Deploy ADX table and ingestion mapping
az kusto script create \
  --cluster-name epa-adx \
  --database-name airquality \
  --resource-group rg-epa-analytics \
  --script-content @streaming/adx/create_tables.kql

⚡ Sample KQL — Real-Time AQI Alerts

// Locations exceeding "Unhealthy" AQI in the last 30 minutes
AqiSensorEvents
| where ingestion_time() > ago(30m)
| summarize avg_aqi = avg(aqi_value),
            max_aqi = max(aqi_value),
            readings = count()
    by site_id, pollutant, bin(event_time, 5m)
| where avg_aqi > 150
| extend severity = case(
    avg_aqi > 300, "Hazardous",
    avg_aqi > 200, "Very Unhealthy",
    avg_aqi > 150, "Unhealthy",
    "Moderate")
| order by avg_aqi desc

📎 Prerequisites

Azure Resources

  • Azure subscription with contributor access
  • Azure Data Factory or Synapse Analytics
  • Azure Data Lake Storage Gen2
  • Azure Data Explorer cluster (for AQI streaming)
  • Azure Event Hub namespace (for AQI streaming)
  • Azure Machine Learning workspace (optional, for AQI prediction)
  • Azure Key Vault for API credentials

Tools Required

  • Azure CLI (2.55.0 or later)
  • dbt CLI (1.7.0 or later)
  • Python 3.9+
  • Git

API Access

  • AirNow API key (free at https://docs.airnowapi.org/account/request/)
  • EPA AQS API credentials (email/key at https://aqs.epa.gov/aqsweb/documents/data_api.html)
  • ECHO API (no key required — open access)
  • Envirofacts (no key required — open access)

🚀 Quick Start

1. Environment Setup

# Clone the repository
git clone <repository-url>
cd csa-inabox/examples/epa

# Install Python dependencies
pip install -r requirements.txt

# Install dbt packages
cd domains/dbt
dbt deps

2. Configure API Keys

# Add to Azure Key Vault or local environment
export AIRNOW_API_KEY="your-airnow-api-key"
export AQS_EMAIL="your-email@example.com"
export AQS_KEY="your-aqs-api-key"
export EVENTHUB_CONNECTION_STRING="your-eventhub-connection"  # For streaming

3. Generate Sample Data

# Generate synthetic environmental data
python data/generators/generate_epa_data.py --output-dir domains/dbt/seeds

# Or fetch real data from APIs
python data/open-data/fetch_airnow.py --bbox "-125,24,-66,50" --parameters "PM2.5,O3"
python data/open-data/fetch_tri.py --states "TX,LA,CA" --years "2020,2021,2022"
python data/open-data/fetch_sdwis.py --states "MI,OH,PA"
python data/open-data/fetch_echo.py --program "CAA" --state "CA"

4. Deploy Infrastructure

# Configure parameters
cp deploy/params.dev.json deploy/params.local.json
# Edit params.local.json with your values

# Deploy using Azure CLI
az deployment group create \
  --resource-group rg-epa-analytics \
  --template-file ../../deploy/bicep/DLZ/main.bicep \
  --parameters @deploy/params.local.json

5. Run dbt Models

cd domains/dbt

# Test connections
dbt debug

# Load seed data
dbt seed

# Run models
dbt run

# Run tests
dbt test

# Generate documentation
dbt docs generate
dbt docs serve

💡 Sample Analytics Scenarios

1. Air Quality Prediction with ML

Use historical AQI data combined with meteorological features to predict next-day air quality for proactive health advisories.

-- AQI prediction performance by metro area
SELECT
    metro_area,
    pollutant,
    prediction_date,
    predicted_aqi,
    actual_aqi,
    prediction_error,
    model_confidence,
    contributing_factors,
    health_advisory_level
FROM gold.gld_aqi_prediction
WHERE prediction_date >= CURRENT_DATE - INTERVAL '30 days'
    AND pollutant = 'PM2.5'
ORDER BY metro_area, prediction_date;

2. Environmental Justice Analysis

Overlay TRI toxic release data and AQI readings with Census socioeconomic indicators to identify disproportionately burdened communities.

-- Communities with highest environmental burden
SELECT
    census_tract,
    state,
    county,
    population,
    pct_minority,
    pct_low_income,
    avg_aqi,
    tri_facilities_within_3mi,
    total_chemical_releases_lbs,
    superfund_sites_within_5mi,
    ej_burden_score,
    ej_percentile
FROM gold.gld_ej_analysis
WHERE ej_percentile >= 90
ORDER BY ej_burden_score DESC
LIMIT 50;

📊 3. Emissions Compliance Monitoring

Track facility-level compliance with Clean Air Act (CAA), Clean Water Act (CWA), and RCRA regulations, identifying repeat violators and enforcement gaps.

-- Facilities with ongoing violations
SELECT
    facility_id,
    facility_name,
    city,
    state,
    program_area,
    violation_type,
    violation_start_date,
    days_in_violation,
    penalties_assessed_usd,
    penalties_collected_usd,
    inspection_count_3yr,
    compliance_status
FROM gold.gld_emissions_compliance
WHERE compliance_status = 'SIGNIFICANT_VIOLATION'
    AND days_in_violation > 180
ORDER BY days_in_violation DESC;

✨ Data Products

AQI Prediction (aqi-prediction)

  • Description: Next-day AQI forecasts with confidence intervals by metro area
  • Freshness: Daily model runs, real-time streaming for current conditions
  • Coverage: 500+ metro areas, 6 criteria pollutants
  • API: /api/v1/aqi-prediction

Environmental Justice (ej-analysis)

  • Description: Census tract-level environmental burden scoring with demographic overlays
  • Freshness: Quarterly updates (aligned with EJScreen releases)
  • Coverage: All U.S. Census tracts (~74,000)
  • API: /api/v1/ej-analysis

Emissions Compliance (emissions-compliance)

  • Description: Facility-level regulatory compliance status across CAA, CWA, RCRA
  • Freshness: Monthly updates from ECHO
  • Coverage: 800,000+ regulated facilities
  • API: /api/v1/emissions-compliance

⚙️ Configuration

⚙️ dbt Profiles

Add to your ~/.dbt/profiles.yml:

epa_analytics:
  target: dev
  outputs:
    dev:
      type: databricks
      host: "{{ env_var('DBT_HOST') }}"
      http_path: "{{ env_var('DBT_HTTP_PATH') }}"
      token: "{{ env_var('DBT_TOKEN') }}"
      schema: epa_dev
      catalog: dev
    prod:
      type: databricks
      host: "{{ env_var('DBT_HOST_PROD') }}"
      http_path: "{{ env_var('DBT_HTTP_PATH_PROD') }}"
      token: "{{ env_var('DBT_TOKEN_PROD') }}"
      schema: epa
      catalog: prod

⚙️ Environment Variables

# Required for data fetching
AIRNOW_API_KEY=your-airnow-api-key
AQS_EMAIL=your-email@example.com
AQS_KEY=your-aqs-api-key
EVENTHUB_CONNECTION_STRING=your-eventhub-connection

# Required for dbt
DBT_HOST=your-databricks-host
DBT_HTTP_PATH=your-sql-warehouse-path
DBT_TOKEN=your-access-token

# Optional
EPA_LOG_LEVEL=INFO
EPA_BATCH_SIZE=5000
ADX_CLUSTER_URI=https://epa-adx.region.kusto.windows.net

🔒 Azure Government Notes

This example is compatible with Azure Government (US) regions. When deploying to Azure Government:

  • Use usgovvirginia or usgovarizona as your Azure region
  • Update ARM/Bicep endpoint references to .usgovcloudapi.net
  • ADX and Event Hub are available in Azure Government
  • AirNow and AQS APIs are publicly accessible from government networks
  • ECHO data may contain enforcement-sensitive details — confirm classification with your ISSO
  • EJScreen data is public and unrestricted

📊 Monitoring & Alerts

  • Streaming Health: Event Hub throughput, ADX ingestion latency, alert trigger rates
  • AQI Thresholds: Automated alerts when AQI exceeds 100 (Unhealthy for Sensitive Groups)
  • Data Freshness: Alerts when AirNow feeds or TRI annual submissions are overdue
  • Data Quality: Automated tests on pollutant ranges, geographic bounds, and completeness
  • Cost Management: ADX auto-scale monitoring with spend guardrails

🔧 Troubleshooting

🔧 Common Issues

  1. AirNow API Rate Limits: Limited to 500 requests/hour per key. Use the --delay parameter and cache responses.
  2. AQS Data Lag: Quality-assured AQS data lags 6–12 months behind real-time AirNow data. Use AirNow for current conditions, AQS for historical analysis.
  3. TRI Reporting Year Lag: TRI data is reported annually with an 18-month delay. The most recent year available is typically two years prior.
  4. ECHO Pagination: ECHO API returns max 10,000 records per query. Use --state-filter and --program-filter to partition requests.
  5. Sensor Simulator Memory: For large-scale simulations (100+ sites), increase the --batch-size parameter to reduce event hub calls.

🔗 Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/new-data-source)
  3. Make changes and add tests
  4. Run quality checks (make lint test)
  5. Submit a pull request

🔗 License

This project is licensed under the MIT License. See LICENSE file for details.


🔗 Acknowledgments

  • EPA for comprehensive environmental monitoring data and open APIs
  • AirNow for real-time air quality data infrastructure
  • Azure Cloud Scale Analytics team for the foundational platform
  • Contributors and the open-source community


Prerequisites / Cost / Teardown

[!IMPORTANT] Cost-safety: this vertical deploys real Azure resources. Always run teardown.sh when you are done. A forgotten workshop environment can run $150-250/day.

Prerequisites

  • Azure CLI 2.50+ logged in (az login), subscription selected (az account set --subscription <id>)
  • jq installed (used by teardown enumeration)
  • Bicep CLI 0.25+ (az bicep version)
  • Contributor + User Access Administrator on target subscription (or a pre-created RG with equivalent RBAC)
  • bash scripts/deploy/validate-prerequisites.sh passes

Cost estimate (rough, East US 2)

  • While running: ~$$150-250/day (services: Synapse, Databricks, Event Hub, Stream Analytics, ADX, Storage, Key Vault)
  • Idle overnight: roughly half if you stop compute (Databricks autostop + Synapse pause)
  • Storage + Key Vault residual: <$5/month if you skip teardown

Numbers are indicative for a small demo dataset; production workloads vary significantly. Use az consumption usage list or Cost Management for live numbers.

Runtime

  • Deploy: ~35-50 minutes (first run; cold Bicep)
  • Teardown: ~10-15 minutes (async RG delete completes in the background)

Teardown

When finished, run the per-example teardown script. It enforces a typed DESTROY-epa confirmation, logs every step to reports/teardown/epa-<timestamp>.log, and deletes the resource group rg-epa-analytics along with any matching subscription-scope deployments.

# Interactive (recommended)
bash examples/epa/deploy/teardown.sh

# Dry run (enumerate only)
bash examples/epa/deploy/teardown.sh --dry-run

# From the repo root via Makefile
make teardown-example VERTICAL=epa
make teardown-example VERTICAL=epa DRYRUN=1

# CI automation (no prompt — only for ephemeral environments)
bash examples/epa/deploy/teardown.sh --yes

See docs/QUICKSTART.md#teardown for the platform-wide teardown flow.

Directory Structure

epa/
├── contracts/                # Data product contracts (schemas, SLOs, owners)
│   ├── air-quality-analytics.yaml
│   ├── toxic-releases.yaml
│   └── water-systems.yaml
├── data/                     # Sample data + synthetic generators
│   ├── generators/
│   └── open-data/
├── deploy/                   # Deployment parameters / Bicep templates
│   ├── params.dev.json
│   ├── params.gov.json
│   └── teardown.sh
├── domains/                  # dbt models (bronze / silver / gold) and seeds
│   └── dbt/
├── notebooks/                # Synapse / Fabric / Databricks notebooks
│   ├── air_quality_forecasting.py
│   └── environmental_justice_analysis.py
├── reports/                  # Power BI report templates and pbix sources
├── ARCHITECTURE.md           # Mermaid + prose architecture diagrams
└── README.md                 # This file

Expected Results

After running the medallion pipeline against the bundled seed data, the Gold layer should populate the following tables. Row counts vary with the seed-data generator parameters; the figures below are the approximate scale you should see on a default run.

Gold Table Approximate Rows Notes
gld_aqi_forecast TODO: capture after first run Populated from Silver via dbt --select tag:gold
gld_compliance_dashboard TODO: capture after first run Populated from Silver via dbt --select tag:gold
gld_environmental_justice TODO: capture after first run Populated from Silver via dbt --select tag:gold

TODO: capture exact counts after the next end-to-end seed run. These are bounded by the seed-data generator parameters in data/generators/.