Home > Docs > Features > Apache Airflow Job
🌬️ Apache Airflow Job - Managed Orchestration in Fabric¶
Enterprise-Grade DAG Orchestration Without Infrastructure Management
Last Updated: 2026-04-27 | Version: 1.0.0
Table of Contents¶
- Overview
- Architecture
- When to Use Airflow vs Alternatives
- Environment Setup
- DAG Authoring Patterns
- Fabric-Specific Operators
- Monitoring and Logging
- CI/CD for DAGs
- Casino Implementation
- Federal Agency Implementation
- Limitations
- References
Overview¶
Apache Airflow Job in Microsoft Fabric provides a fully managed Airflow 2.x environment within the Fabric platform. It lets data engineers author DAGs (Directed Acyclic Graphs) using the familiar Airflow Python SDK while benefiting from Fabric's integrated security, capacity management, and monitoring.
This is the right choice when your orchestration needs exceed what Fabric Pipelines can express -- complex branching logic, dynamic task generation, cross-system dependencies, or teams with existing Airflow expertise.
Key Capabilities¶
| Capability | Description |
|---|---|
| Managed infrastructure | No VMs, no Kubernetes, no Airflow metadata DB to manage |
| Airflow 2.9+ | Current Airflow version with TaskFlow API support |
| Fabric operators | Native operators for Notebooks, Pipelines, Lakehouses, Eventhouses |
| Git sync | DAGs sync from Git repo (GitHub, Azure DevOps) |
| Python packages | Install custom packages via requirements.txt |
| Workspace Identity | Authenticate to Fabric services automatically |
| CU-based billing | Compute charged against Fabric capacity |
Architecture¶
graph TB
subgraph "Apache Airflow Job (Managed)"
SCHED[Scheduler]
WEB[Airflow UI]
EXEC[Executor Pool]
META[(Metadata DB)]
SCHED --> EXEC
SCHED --> META
WEB --> META
end
subgraph "DAG Sources"
GIT[Git Repository]
UI[Fabric Portal Editor]
end
GIT --> SCHED
UI --> SCHED
subgraph "Fabric Services"
NB[Notebooks]
PIPE[Pipelines]
LH[Lakehouse]
EH[Eventhouse]
WH[Warehouse]
end
EXEC --> NB
EXEC --> PIPE
EXEC --> LH
EXEC --> EH
EXEC --> WH
subgraph "External"
REST[REST APIs]
BLOB[Azure Blob]
SQL[Azure SQL]
end
EXEC --> REST
EXEC --> BLOB
EXEC --> SQL When to Use Airflow vs Alternatives¶
Decision Matrix¶
| Criterion | Fabric Pipeline | Airflow Job | Spark Job Definition |
|---|---|---|---|
| Simple ETL | Excellent | Overkill | Overkill |
| Complex branching | Limited | Excellent | N/A |
| Dynamic task generation | No | Yes (expand) | No |
| External system deps | Some connectors | Full Python ecosystem | No |
| Team expertise | Low barrier | Airflow knowledge needed | Spark knowledge needed |
| Retry/backfill | Basic | Advanced (catchup, backfill) | Basic |
| Cross-workspace | Via REST | Native operators | Single workspace |
| Scheduling | Cron + trigger | Cron + sensors + datasets | Cron only |
| Monitoring | Pipeline runs view | Airflow UI + Gantt charts | Spark UI |
| CI/CD | fabric-cicd | Git sync + DAG tests | fabric-cicd |
| Cost | CU per activity | CU for scheduler + workers | CU per job |
Decision Flowchart¶
flowchart TD
A[Orchestration need?] --> B{Complex dependencies or branching?}
B -->|No| C{Simple copy/transform?}
C -->|Yes| D[Fabric Pipeline]
C -->|No| E{Pure Spark workload?}
E -->|Yes| F[Spark Job Definition]
E -->|No| D
B -->|Yes| G{Team knows Airflow?}
G -->|Yes| H[Airflow Job]
G -->|No| I{Dynamic task count?}
I -->|Yes| H
I -->|No| J{External system sensors?}
J -->|Yes| H
J -->|No| D Environment Setup¶
Creating an Airflow Job¶
- In your Fabric workspace, select + New > Apache Airflow Job
- Name the job (e.g.,
casino-medallion-orchestrator) - Configure Git sync (recommended) or use the built-in editor
Git Sync Configuration¶
# .airflow/config.yaml
git_sync:
repository: https://github.com/contoso/fabric-dags.git
branch: main
dags_folder: dags/
sync_interval_seconds: 60
credentials:
type: pat # or ssh
secret_name: github-pat # from Key Vault
Python Dependencies¶
# requirements.txt
apache-airflow-providers-microsoft-fabric==1.2.0
great-expectations==0.18.0
delta-spark==3.1.0
requests==2.31.0
pydantic==2.5.0
Environment Variables¶
# Set in Airflow Job configuration, NOT in DAG code
# These are available to all DAGs in the job
# Fabric workspace settings
FABRIC_WORKSPACE_ID = "your-workspace-id"
FABRIC_LAKEHOUSE_NAME = "lh_bronze"
# Business configuration
CTR_THRESHOLD = "10000"
RETENTION_DAYS = "365"
ENVIRONMENT = "prod"
DAG Authoring Patterns¶
Pattern 1: Bronze-Silver-Gold Medallion Pipeline¶
"""
Casino Medallion Pipeline
Orchestrates the full bronze → silver → gold data flow for casino data.
"""
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.microsoft.fabric.operators.notebook import FabricNotebookOperator
from airflow.providers.microsoft.fabric.operators.pipeline import FabricPipelineOperator
from airflow.providers.microsoft.fabric.sensors.lakehouse import FabricLakehouseSensor
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
"email": ["data-team@contoso.com"],
}
@dag(
dag_id="casino_medallion_pipeline",
description="Daily medallion pipeline for casino data",
schedule="0 6 * * *", # 6 AM daily
start_date=datetime(2026, 1, 1),
catchup=False,
default_args=default_args,
tags=["casino", "medallion", "daily"],
)
def casino_medallion():
# ---- BRONZE LAYER ----
ingest_slot_telemetry = FabricNotebookOperator(
task_id="bronze_slot_telemetry",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="01_bronze_slot_telemetry",
parameters={"date": "{{ ds }}", "env": "{{ var.value.environment }}"},
)
ingest_table_games = FabricNotebookOperator(
task_id="bronze_table_games",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="02_bronze_table_game_results",
parameters={"date": "{{ ds }}"},
)
ingest_player_tracking = FabricNotebookOperator(
task_id="bronze_player_tracking",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="03_bronze_player_tracking",
parameters={"date": "{{ ds }}"},
)
# ---- DATA QUALITY GATE ----
@task
def validate_bronze(ds=None):
"""Run Great Expectations on bronze tables."""
import great_expectations as gx
context = gx.get_context()
results = context.run_checkpoint(checkpoint_name="bronze_checkpoint")
if not results.success:
raise ValueError(f"Bronze quality check failed for {ds}")
return {"status": "passed", "date": ds}
bronze_quality = validate_bronze()
# ---- SILVER LAYER ----
cleanse_slots = FabricNotebookOperator(
task_id="silver_slot_cleansed",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="01_silver_slot_cleansed",
parameters={"date": "{{ ds }}"},
)
cleanse_table_games = FabricNotebookOperator(
task_id="silver_table_games_cleansed",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="02_silver_table_games_cleansed",
parameters={"date": "{{ ds }}"},
)
cleanse_players = FabricNotebookOperator(
task_id="silver_player_profiles",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="03_silver_player_profiles",
parameters={"date": "{{ ds }}"},
)
# ---- GOLD LAYER ----
gold_slot_kpis = FabricNotebookOperator(
task_id="gold_slot_performance",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="01_gold_slot_performance",
parameters={"date": "{{ ds }}"},
)
gold_compliance = FabricNotebookOperator(
task_id="gold_compliance_report",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="05_gold_compliance_monitoring",
parameters={"date": "{{ ds }}"},
)
gold_revenue = FabricNotebookOperator(
task_id="gold_revenue_dashboard",
workspace_id="{{ var.value.workspace_id }}",
notebook_name="03_gold_revenue_analysis",
parameters={"date": "{{ ds }}"},
)
# ---- DEPENDENCIES ----
# Bronze tasks run in parallel
[ingest_slot_telemetry, ingest_table_games, ingest_player_tracking] >> bronze_quality
# Silver depends on bronze quality gate
bronze_quality >> [cleanse_slots, cleanse_table_games, cleanse_players]
# Gold depends on all silver
[cleanse_slots, cleanse_table_games, cleanse_players] >> [gold_slot_kpis, gold_compliance, gold_revenue]
casino_medallion()
Pattern 2: Dynamic Task Generation¶
"""
Federal Agency Dynamic Pipeline
Generates tasks dynamically based on configured agencies.
"""
from airflow.decorators import dag, task
from airflow.providers.microsoft.fabric.operators.notebook import FabricNotebookOperator
AGENCIES = ["usda", "sba", "noaa", "epa", "doi", "doj"]
LAYERS = ["bronze", "silver", "gold"]
@dag(
dag_id="federal_multi_agency_pipeline",
schedule="0 4 * * 1", # Weekly on Monday at 4 AM
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["federal", "multi-agency"],
)
def federal_pipeline():
@task
def check_data_freshness(agency: str) -> dict:
"""Check if source data has been updated since last run."""
from fabric.onelake import OneLakeClient
client = OneLakeClient()
stats = client.get_table_stats(f"lh_bronze.Lakehouse/Tables/{agency}")
return {"agency": agency, "needs_refresh": stats["last_modified_hours_ago"] < 168}
for agency in AGENCIES:
freshness = check_data_freshness(agency)
bronze = FabricNotebookOperator(
task_id=f"bronze_{agency}",
notebook_name=f"10_bronze_{agency}_ingest",
parameters={"agency": agency, "date": "{{ ds }}"},
)
silver = FabricNotebookOperator(
task_id=f"silver_{agency}",
notebook_name=f"10_silver_{agency}_cleansed",
parameters={"agency": agency},
)
gold = FabricNotebookOperator(
task_id=f"gold_{agency}",
notebook_name=f"10_gold_{agency}_analytics",
parameters={"agency": agency},
)
freshness >> bronze >> silver >> gold
federal_pipeline()
Pattern 3: Sensor-Based Orchestration¶
from airflow.providers.microsoft.fabric.sensors.lakehouse import FabricLakehouseSensor
# Wait for upstream data to arrive before processing
wait_for_slot_data = FabricLakehouseSensor(
task_id="wait_for_slot_data",
workspace_id="{{ var.value.workspace_id }}",
lakehouse_name="lh_bronze",
table_name="slot_telemetry",
partition_filter={"date": "{{ ds }}"},
min_row_count=1000,
poke_interval=300, # Check every 5 minutes
timeout=3600, # Give up after 1 hour
mode="reschedule", # Free up worker while waiting
)
Fabric-Specific Operators¶
Available Operators¶
| Operator | Purpose | Key Parameters |
|---|---|---|
FabricNotebookOperator | Run a Fabric notebook | notebook_name, parameters, timeout |
FabricPipelineOperator | Trigger a Fabric pipeline | pipeline_name, parameters |
FabricLakehouseSensor | Wait for data in Lakehouse | table_name, min_row_count |
FabricSqlOperator | Execute SQL on warehouse/SQL DB | sql, warehouse_name |
FabricRestOperator | Call Fabric REST API | endpoint, method, body |
FabricSparkJobOperator | Run a Spark Job Definition | sjd_name, arguments |
FabricRestOperator Example¶
from airflow.providers.microsoft.fabric.operators.rest import FabricRestOperator
refresh_dataset = FabricRestOperator(
task_id="refresh_semantic_model",
method="POST",
endpoint=f"/v1/workspaces/{workspace_id}/semanticModels/{model_id}/refresh",
json_body={"type": "full"},
expected_status_code=202,
)
Monitoring and Logging¶
Airflow UI¶
The managed Airflow job provides the full Airflow web UI:
- DAG view: DAG dependency graphs and run history
- Tree view: Run status across dates
- Gantt chart: Task duration analysis
- Task logs: Stdout/stderr for each task execution
Integration with Fabric Monitoring¶
# In your DAG, emit custom metrics to Fabric
@task
def emit_metrics(run_stats: dict):
"""Push DAG run metrics to Eventhouse for unified monitoring."""
from fabric.eventhouse import EventhouseClient
client = EventhouseClient(database="platform_monitoring")
client.ingest({
"dag_id": "casino_medallion_pipeline",
"run_date": run_stats["date"],
"duration_seconds": run_stats["duration"],
"task_count": run_stats["tasks"],
"failed_tasks": run_stats["failures"],
"timestamp": datetime.utcnow().isoformat()
}, table="airflow_runs")
KQL Queries for Airflow Monitoring¶
// DAG execution trends
airflow_runs
| where timestamp > ago(30d)
| summarize
AvgDuration = avg(duration_seconds),
FailureRate = countif(failed_tasks > 0) * 100.0 / count(),
TotalRuns = count()
by dag_id, bin(timestamp, 1d)
| render timechart
// Slowest tasks across all DAGs
airflow_task_metrics
| where timestamp > ago(7d)
| summarize P95Duration = percentile(duration_seconds, 95) by task_id, dag_id
| top 20 by P95Duration desc
CI/CD for DAGs¶
Repository Structure¶
airflow-dags/
├── dags/
│ ├── casino_medallion.py
│ ├── federal_pipeline.py
│ ├── data_quality.py
│ └── utils/
│ ├── __init__.py
│ ├── fabric_helpers.py
│ └── quality_checks.py
├── tests/
│ ├── test_casino_medallion.py
│ ├── test_federal_pipeline.py
│ └── conftest.py
├── requirements.txt
├── .airflow/
│ └── config.yaml
└── .github/
└── workflows/
└── test-dags.yml
DAG Testing¶
# tests/test_casino_medallion.py
import pytest
from airflow.models import DagBag
@pytest.fixture
def dagbag():
return DagBag(dag_folder="dags/", include_examples=False)
def test_dag_loads(dagbag):
assert "casino_medallion_pipeline" in dagbag.dags
assert dagbag.import_errors == {}
def test_dag_structure(dagbag):
dag = dagbag.get_dag("casino_medallion_pipeline")
assert len(dag.tasks) >= 9 # 3 bronze + 1 quality + 3 silver + 2 gold
# Verify bronze tasks have no upstream dependencies
bronze_tasks = [t for t in dag.tasks if t.task_id.startswith("bronze_")]
for task in bronze_tasks:
assert len(task.upstream_list) == 0
def test_no_cycles(dagbag):
dag = dagbag.get_dag("casino_medallion_pipeline")
# Airflow will raise if there's a cycle during parsing
assert dag is not None
GitHub Actions Workflow¶
# .github/workflows/test-dags.yml
name: Test Airflow DAGs
on:
pull_request:
paths: ['dags/**', 'tests/**']
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: |
pip install apache-airflow==2.9.0 pytest
pip install -r requirements.txt
- run: pytest tests/ -v
Casino Implementation¶
The full casino medallion DAG (shown above in Pattern 1) orchestrates:
- Bronze: Parallel ingestion of slot telemetry, table games, and player tracking
- Quality gate: Great Expectations validation before silver processing
- Silver: Cleansing, deduplication, PII hashing
- Gold: KPI computation, compliance reporting, revenue dashboards
Compliance-Specific Tasks¶
@task
def check_ctr_backlog(ds=None):
"""Verify all CTR-eligible transactions have been reported."""
from fabric.sqlendpoint import SqlClient
sql = SqlClient(lakehouse="lh_gold")
result = sql.query(f"""
SELECT COUNT(*) as unreported
FROM gold_compliance_monitoring
WHERE transaction_date = '{ds}'
AND ctr_required = true
AND ctr_filed = false
""")
unreported = result[0]["unreported"]
if unreported > 0:
raise ValueError(f"{unreported} unfiled CTRs for {ds}")
return {"status": "compliant", "date": ds}
Federal Agency Implementation¶
The dynamic federal pipeline (shown in Pattern 2) processes all six agencies in parallel with per-agency freshness checks. Each agency follows the same medallion flow but with agency-specific notebooks.
Cross-Agency Quality Report¶
@task
def cross_agency_quality_report(agency_results: list):
"""Aggregate quality metrics across all federal agencies."""
report = {
"report_date": datetime.utcnow().isoformat(),
"agencies": {},
"overall_pass_rate": 0
}
passed = 0
for result in agency_results:
agency = result["agency"]
report["agencies"][agency] = {
"status": result["status"],
"row_count": result.get("row_count", 0),
"quality_score": result.get("quality_score", 0)
}
if result["status"] == "passed":
passed += 1
report["overall_pass_rate"] = passed / len(agency_results)
return report
Limitations¶
| Limitation | Details | Workaround |
|---|---|---|
| Airflow version | Managed version, cannot pin arbitrary versions | Use compatible operator versions |
| No custom plugins | Cannot install Airflow plugins (only providers) | Use TaskFlow Python tasks instead |
| Worker scaling | Limited by capacity CU allocation | Plan capacity for peak DAG loads |
| No Celery/Kubernetes executor | Local executor only in managed mode | Design DAGs with appropriate parallelism |
| DAG size | Max 200 tasks per DAG recommended | Split large DAGs into sub-DAGs |
| Secrets | No native Airflow secrets backend | Use Fabric Variable Libraries + Key Vault |
| XCom size | 48 KB per XCom value | Use OneLake for large intermediate data |