ETL Migration: Dataflow, Composer, and Pub/Sub to ADF, dbt, and Event Hubs¶
A hands-on guide for data engineers migrating GCP ETL and orchestration services to Azure Data Factory, dbt, Databricks Workflows, Event Hubs, and Azure Functions.
Scope¶
This guide covers:
- Dataflow (Apache Beam) to ADF + Databricks / Stream Analytics
- Cloud Composer (Airflow) to ADF pipelines or Databricks Workflows
- Dataform to dbt
- Pub/Sub to Event Hubs / Event Grid
- Cloud Functions to Azure Functions
For compute migration (BigQuery, Dataproc), see Compute Migration.
Architecture overview¶
flowchart LR
subgraph GCP["GCP ETL Stack"]
DF[Dataflow - Apache Beam]
CC[Cloud Composer - Airflow]
DFM[Dataform]
PS[Pub/Sub]
CF[Cloud Functions]
end
subgraph Azure["Azure ETL Stack"]
ADF[Azure Data Factory]
DBT[dbt on Databricks]
DW[Databricks Workflows]
EH[Event Hubs]
EG[Event Grid]
AF[Azure Functions]
ASA[Stream Analytics]
end
DF -->|batch| ADF
DF -->|streaming| ASA
CC --> ADF
CC --> DW
DFM --> DBT
PS --> EH
PS -->|notifications| EG
CF --> AF Dataflow (Apache Beam) to Azure¶
Dataflow is Google's managed runner for Apache Beam pipelines. The migration path depends on whether the pipeline is batch or streaming.
Batch Dataflow pipelines¶
Batch Beam pipelines typically read from GCS or BigQuery, apply transforms, and write to BigQuery or GCS. The Azure equivalent is ADF pipelines + Databricks notebooks (or Fabric notebooks).
| Beam concept | Azure equivalent | Notes |
|---|---|---|
| Pipeline | ADF pipeline | Orchestration container |
| PCollection | DataFrame / Delta table | Data abstraction |
| ParDo / DoFn | Databricks notebook cell / UDF | Custom transform logic |
| GroupByKey | SparkSQL GROUP BY / dbt model | Aggregation |
| CoGroupByKey | SparkSQL JOIN | Multi-input join |
| Flatten | UNION ALL | Combine PCollections |
| Side inputs | Broadcast variables / temp views | Small lookup datasets |
| Beam IO (BigQuery) | ADF BigQuery connector | Source/sink connectors |
| Beam IO (GCS) | ADF GCS connector / ADLS | Source/sink connectors |
Migration approach:
- Decompose the Beam pipeline into logical stages (read, transform, write)
- Map reads to ADF copy activities or Databricks source reads
- Map transforms to dbt models (for SQL-expressible logic) or Databricks notebooks (for Python/Java logic)
- Map writes to ADF sink activities or Delta table writes
- Orchestrate the stages in an ADF pipeline
Streaming Dataflow pipelines¶
Streaming Beam pipelines read from Pub/Sub, apply windowed transforms, and write to BigQuery or Pub/Sub. The Azure equivalent depends on complexity:
| Complexity | Azure path | When to use |
|---|---|---|
| Simple aggregation/filtering | Azure Stream Analytics (ASA) | SQL-first, low-code |
| Complex event processing | Databricks Structured Streaming | Code-first, stateful |
| Real-time analytics | Fabric Real-Time Intelligence | Integrated with Fabric |
Stream Analytics migration example:
Beam pipeline that counts events per minute:
# Beam (Dataflow)
(pipeline
| beam.io.ReadFromPubSub(topic='projects/acme/topics/clicks')
| beam.WindowInto(window.FixedWindows(60))
| beam.CombinePerKey(sum)
| beam.io.WriteToBigQuery('acme.analytics.click_counts'))
Azure Stream Analytics equivalent:
-- ASA query
SELECT
click_type,
COUNT(*) AS click_count,
System.Timestamp() AS window_end
FROM clicks_input TIMESTAMP BY event_time
GROUP BY
click_type,
TumblingWindow(minute, 1)
Databricks Structured Streaming equivalent:
# Databricks notebook
from pyspark.sql.functions import window, count
clicks = (spark.readStream
.format("eventhubs")
.options(**eh_conf)
.load())
counts = (clicks
.withWatermark("event_time", "1 minute")
.groupBy(
window("event_time", "1 minute"),
"click_type"
)
.agg(count("*").alias("click_count")))
(counts.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/click_counts")
.table("analytics.click_counts"))
Cloud Composer (Airflow) to ADF and Databricks Workflows¶
Cloud Composer is managed Apache Airflow. The migration path depends on DAG complexity.
DAG classification and target mapping¶
| DAG pattern | Azure target | Rationale |
|---|---|---|
| Simple schedule + SQL transforms | dbt job on Databricks Workflow | dbt manages dependencies natively |
| Multi-step with GCP operators | ADF pipeline | ADF has 100+ connectors |
| Python-heavy custom operators | Databricks notebook workflow | Full Python environment |
| Cross-system orchestration | ADF pipeline calling Databricks + other services | ADF is the orchestration hub |
| Sensor-based (file arrival) | ADF event trigger + Databricks Auto Loader | Event-driven instead of polling |
Airflow operator to ADF activity mapping¶
| Airflow operator | ADF activity | Notes |
|---|---|---|
BigQueryOperator | ADF Copy Activity (BigQuery source) | During migration; post-migration use Databricks SQL |
DataprocSubmitJobOperator | ADF Databricks Notebook activity | Submit Spark jobs to Databricks |
GCSToGCSOperator | ADF Copy Activity (GCS to ADLS) | File copy between storage |
PythonOperator | ADF Azure Function activity / Databricks notebook | Custom Python logic |
BashOperator | ADF custom activity / Azure Batch | Shell commands |
EmailOperator | Logic Apps / Power Automate | Email notifications |
SlackWebhookOperator | Logic Apps Slack connector | Chat notifications |
FileSensor | ADF storage event trigger | Event-driven, not polling |
ExternalTaskSensor | ADF pipeline dependency | Cross-pipeline dependencies |
BranchPythonOperator | ADF If Condition activity | Conditional branching |
Worked example: Airflow DAG to ADF pipeline¶
Airflow DAG:
# dag_daily_etl.py (Cloud Composer)
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
with DAG('daily_etl', schedule_interval='0 2 * * *') as dag:
extract = BigQueryInsertJobOperator(
task_id='extract_orders',
configuration={"query": {"query": "SELECT * FROM sales.orders WHERE ...", "destinationTable": {...}}}
)
transform = DataprocSubmitJobOperator(
task_id='transform_spark',
job={"sparkJob": {"mainPythonFileUri": "gs://scripts/transform.py"}}
)
load = BigQueryInsertJobOperator(
task_id='load_gold',
configuration={"query": {"query": "INSERT INTO finance.gold_orders SELECT ..."}}
)
extract >> transform >> load
ADF pipeline equivalent:
- Extract: ADF Copy Activity reading from source to ADLS bronze
- Transform: ADF Databricks Notebook activity running the Spark transform
- Load: dbt model materializing the gold table, triggered by ADF
The ADF pipeline uses schedule triggers set to 0 2 * * * (same cron). Dependencies are expressed as activity success conditions.
Dataform to dbt¶
Dataform and dbt are conceptually very close. Both are SQL-first transformation frameworks with dependency management, testing, and documentation.
Mapping table¶
| Dataform concept | dbt equivalent | Notes |
|---|---|---|
| SQLX file | SQL model file | Nearly identical syntax |
config { type: "table" } | {{ config(materialized='table') }} | Jinja config block |
config { type: "incremental" } | {{ config(materialized='incremental') }} | Same concept |
config { type: "view" } | {{ config(materialized='view') }} | Same concept |
${ref("table_name")} | {{ ref('table_name') }} | Reference syntax |
${self()} | {{ this }} | Self-reference |
${when(incremental(), "...")} | {% if is_incremental() %}...{% endif %} | Incremental filter |
| Assertions | dbt tests | Testing framework |
dataform.json | dbt_project.yml | Project configuration |
| JavaScript transforms | dbt Python models | For non-SQL logic |
| Compilation | dbt compile | SQL compilation step |
Migration steps¶
- Copy SQLX files to dbt
models/directory - Replace
${ref("x")}with{{ ref('x') }} - Replace
config { type: "..." }blocks with dbt{{ config() }} - Replace
${when(incremental(), ...)}with{% if is_incremental() %}...{% endif %} - Convert assertions to dbt tests in YAML schema files
- Run
dbt compileto verify syntax - Run
dbt runto materialize models
The migration is typically straightforward because Dataform was designed with similar principles to dbt.
Pub/Sub to Event Hubs and Event Grid¶
Pub/Sub to Event Hubs¶
Pub/Sub is Google's managed message queue. Event Hubs is the Azure equivalent with Kafka protocol support.
| Pub/Sub concept | Event Hubs equivalent | Notes |
|---|---|---|
| Topic | Event Hub (within a namespace) | Message channel |
| Subscription | Consumer group | Message consumption |
| Push subscription | N/A (use Azure Functions trigger) | Event Hubs is pull-based; Functions provide push semantics |
| Pull subscription | Consumer group + consumer client | Standard Kafka consumer pattern |
| Message ordering | Partition key ordering | Per-partition ordering guaranteed |
| Dead letter topic | Dead letter queue (capture) | Failed message handling |
| Message retention | 1-90 days (Standard/Premium) | Configurable retention |
| Exactly-once delivery | At-least-once (consumer manages) | Use idempotent consumers |
| Schema validation | Schema Registry | Avro/JSON schema enforcement |
Kafka protocol support: Event Hubs exposes a Kafka endpoint. Existing Pub/Sub consumers that use the Kafka dialect (common for Dataflow/Flink consumers) can connect to Event Hubs by changing the bootstrap server address and authentication.
Pub/Sub notifications to Event Grid¶
GCS Pub/Sub notifications (triggered on object create/delete) map to Azure Event Grid blob events.
| GCS notification | Event Grid equivalent | Notes |
|---|---|---|
OBJECT_FINALIZE | Microsoft.Storage.BlobCreated | New blob created |
OBJECT_DELETE | Microsoft.Storage.BlobDeleted | Blob deleted |
OBJECT_ARCHIVE | Tier change event | Blob tier changed |
| Notification to Cloud Function | Event Grid subscription to Azure Function | Same pattern |
Cloud Functions to Azure Functions¶
Cloud Functions and Azure Functions are both serverless, event-driven compute platforms with similar trigger models.
| Cloud Functions trigger | Azure Functions trigger | Notes |
|---|---|---|
| HTTP trigger | HTTP trigger | Direct replacement |
| Pub/Sub trigger | Event Hub trigger / Service Bus trigger | Message-driven |
| Cloud Storage trigger | Blob trigger / Event Grid trigger | File-arrival driven |
| Firestore trigger | Cosmos DB trigger | Document change driven |
| Scheduler trigger | Timer trigger | Cron-based |
Migration steps¶
- Identify all Cloud Functions and their triggers
- Map GCP triggers to Azure triggers (table above)
- Port function code -- most Python/Node.js logic transfers directly
- Replace GCP SDK calls (
google.cloud.storage,google.cloud.bigquery) with Azure SDK calls (azure.storage.blob,azure.identity) - Deploy using Azure Functions Core Tools or GitHub Actions
- Test with representative event payloads
Example: GCS trigger to Blob trigger
Cloud Function (Python):
# GCP Cloud Function
from google.cloud import storage
def process_file(event, context):
bucket = event['bucket']
name = event['name']
client = storage.Client()
blob = client.bucket(bucket).blob(name)
content = blob.download_as_text()
# Process content...
Azure Function (Python):
# Azure Function
import azure.functions as func
from azure.storage.blob import BlobServiceClient
def main(myblob: func.InputStream):
content = myblob.read().decode('utf-8')
blob_name = myblob.name
# Process content...
Orchestration decision tree¶
For each GCP pipeline, decide on the Azure orchestration pattern:
| Pipeline shape | Azure pattern |
|---|---|
| Pure SQL transforms with dependencies | dbt on Databricks Workflow |
| SQL transforms + file copies + notifications | ADF pipeline calling dbt + copy activities |
| Spark jobs with dependencies | Databricks multi-task Workflow |
| Event-driven (file arrival, message) | Event Grid/Hubs trigger + Azure Function + ADF |
| Complex cross-system (APIs, databases, files) | ADF pipeline as orchestrator |
| Real-time streaming | Event Hubs + ASA or Structured Streaming |
Validation checklist¶
After migrating ETL workloads:
- All Airflow DAGs have equivalent ADF pipelines or Databricks Workflows
- DAG schedules match original Composer schedules
- Dataform models converted to dbt and compile/run successfully
- Pub/Sub topics migrated to Event Hubs with schema compatibility
- Cloud Functions ported to Azure Functions with matching trigger types
- Dataflow batch pipelines replaced by ADF + Databricks pipelines
- Dataflow streaming pipelines replaced by ASA or Structured Streaming
- End-to-end data pipeline produces matching output
- Alerting and monitoring configured for all new pipelines
Last updated: 2026-04-30 Maintainers: CSA-in-a-Box core team Related: Compute Migration | Storage Migration | Analytics Migration | Migration Playbook