Tutorial: Migrate Dataflow Pipeline to ADF + dbt¶
A hands-on, step-by-step walkthrough for data engineers migrating Google Cloud Dataflow (Apache Beam) pipelines to Azure Data Factory with dbt transforms, and Event Hubs + Stream Analytics for streaming workloads.
Estimated time: 3-5 hours per pipeline Difficulty: Intermediate to Advanced GCP experience assumed: Dataflow, Apache Beam SDK (Python or Java), Pub/Sub basics
Prerequisites¶
Before starting this tutorial, ensure you have the following:
| Requirement | Details |
|---|---|
| GCP project | With Dataflow jobs you intend to migrate; read access to Beam pipeline code |
| Apache Beam source code | Git repository or local copy of the pipeline code |
| Azure subscription | With permissions to create ADF, Storage Account, Event Hubs, and Stream Analytics |
| Azure Data Factory | Provisioned ADF instance |
| ADLS Gen2 storage account | With hierarchical namespace enabled |
| Azure Databricks workspace | With Unity Catalog enabled (for batch transforms) |
| dbt Core | pip install dbt-databricks (v1.7+) |
| Azure CLI | Authenticated with az login |
| Event Hubs namespace | Provisioned (for streaming pipelines only) |
GCP comparison: Dataflow is a managed service for Apache Beam pipelines that handles both batch and streaming. On Azure, batch data movement maps to ADF Copy Activities, batch transforms map to dbt models on Databricks, and streaming maps to Event Hubs + Stream Analytics (SQL-first) or Databricks Structured Streaming (code-first).
Scenario¶
You are migrating two Dataflow pipelines:
- Batch pipeline: Reads CSV files from GCS, cleans and joins customer/order data, writes aggregated results to BigQuery. Runs daily at 03:00 UTC via Cloud Scheduler.
- Streaming pipeline: Reads JSON events from Pub/Sub, applies windowed aggregations (5-minute tumbling windows), writes results to BigQuery for near-real-time dashboards.
By the end of this tutorial you will have equivalent pipelines on Azure: ADF + dbt for batch, Event Hubs + Stream Analytics for streaming.
Step 1: Document existing Dataflow pipeline¶
1.1 Inventory the batch pipeline¶
Review the Apache Beam code and document every stage:
# Example: existing Beam batch pipeline (Python)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run():
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=acme-gov',
'--region=us-east4',
'--temp_location=gs://acme-gov-temp/dataflow/',
'--staging_location=gs://acme-gov-staging/dataflow/',
])
with beam.Pipeline(options=options) as p:
# Stage 1: Read from GCS
customers = (
p
| 'ReadCustomers' >> beam.io.ReadFromText('gs://acme-gov-raw/customers/*.csv')
| 'ParseCustomers' >> beam.Map(parse_customer_csv)
| 'FilterValidCustomers' >> beam.Filter(lambda r: r['email'] is not None)
)
orders = (
p
| 'ReadOrders' >> beam.io.ReadFromText('gs://acme-gov-raw/orders/*.csv')
| 'ParseOrders' >> beam.Map(parse_order_csv)
| 'FilterValidOrders' >> beam.Filter(lambda r: r['amount'] > 0)
)
# Stage 2: Join and aggregate
joined = (
{'customers': customers, 'orders': orders}
| 'CoGroupByCustomerId' >> beam.CoGroupByKey()
| 'FlattenJoin' >> beam.FlatMap(join_customer_orders)
)
summary = (
joined
| 'KeyByCustomer' >> beam.Map(lambda r: (r['customer_id'], r))
| 'GroupByCustomer' >> beam.GroupByKey()
| 'Aggregate' >> beam.Map(compute_order_summary)
)
# Stage 3: Write to BigQuery
summary | 'WriteToBQ' >> beam.io.WriteToBigQuery(
'acme-gov:analytics.order_summary',
schema='customer_id:STRING,total_orders:INTEGER,total_revenue:FLOAT,avg_order:FLOAT',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
)
1.2 Document the pipeline DAG¶
GCS (customers/*.csv) ──→ Parse ──→ Filter ──┐
├──→ CoGroupByKey ──→ Aggregate ──→ BigQuery
GCS (orders/*.csv) ──→ Parse ──→ Filter ──┘
1.3 Record the streaming pipeline¶
# Example: existing Beam streaming pipeline
def run_streaming():
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=acme-gov',
'--streaming',
'--region=us-east4',
])
with beam.Pipeline(options=options) as p:
events = (
p
| 'ReadPubSub' >> beam.io.ReadFromPubSub(topic='projects/acme-gov/topics/click-events')
| 'ParseJSON' >> beam.Map(json.loads)
| 'Window5Min' >> beam.WindowInto(beam.window.FixedWindows(300))
| 'CountByPage' >> beam.combiners.Count.PerKey()
| 'FormatOutput' >> beam.Map(format_pageview_count)
| 'WriteToBQ' >> beam.io.WriteToBigQuery(
'acme-gov:analytics.pageview_counts_5min',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
)
1.4 Build migration inventory¶
| Pipeline | Type | Sources | Sinks | Transforms | Schedule | Priority |
|---|---|---|---|---|---|---|
| customer_orders_batch | Batch | GCS (CSV) | BigQuery | Parse, filter, join, aggregate | Daily 03:00 UTC | High |
| pageview_streaming | Streaming | Pub/Sub (JSON) | BigQuery | Parse, window (5m tumbling), count | Continuous | High |
Step 2: Identify transformation pattern¶
2.1 Decision matrix: batch vs. streaming¶
| Beam pipeline characteristic | Azure target | Rationale |
|---|---|---|
| Batch, file-based sources | ADF Copy Activity + dbt | ADF handles file movement; dbt handles SQL transforms |
| Batch, database sources | ADF Copy Activity + dbt | ADF has 100+ native connectors |
| Streaming, low-latency (< 1 min) | Event Hubs + Stream Analytics | SQL-first streaming for simple transforms |
| Streaming, complex stateful logic | Event Hubs + Databricks Structured Streaming | Code-first for complex windowing and state |
| Hybrid (batch + micro-batch) | ADF + Databricks Auto Loader | Auto Loader provides streaming-like ingestion from files |
2.2 Map Beam concepts to Azure¶
| Apache Beam concept | Azure equivalent | Notes |
|---|---|---|
Pipeline | ADF Pipeline + dbt project | Top-level orchestration unit |
PCollection | ADLS Gen2 files or Delta tables | Intermediate data lands in storage |
ReadFromText / ReadFromAvro | ADF Copy Activity (source) | File-based source connectors |
ReadFromPubSub | Event Hubs consumer | Kafka protocol or native SDK |
WriteToBigQuery | ADF Copy Activity (sink) or dbt model | Write to Delta Lake / Fabric |
Map / FlatMap | dbt SQL expression or ADF Data Flow mapping | SQL for declarative; Data Flow for visual |
Filter | dbt WHERE clause | Direct SQL equivalent |
GroupByKey / CoGroupByKey | dbt GROUP BY / JOIN | SQL joins replace Beam grouping |
Combine (sum, count, avg) | dbt aggregate functions | SUM(), COUNT(), AVG() |
WindowInto (fixed, sliding, session) | Stream Analytics windowing | TumblingWindow, HoppingWindow, SessionWindow |
ParDo (custom DoFn) | dbt macro, ADF Data Flow script, or Databricks notebook | Depends on complexity |
Side inputs | dbt ref() to lookup table | Join to a reference table |
Triggers | Stream Analytics output policy | Watermark and late-arrival handling |
Dead letter queue | Event Hubs secondary topic + ADF error handling | Separate error path |
Step 3: Create ADF pipeline for batch data movement¶
3.1 Set up linked services¶
Create linked services for GCS (source) and ADLS Gen2 (sink):
{
"name": "ls_gcs_acme",
"type": "GoogleCloudStorage",
"typeProperties": {
"accessKeyId": "<GCS_HMAC_ACCESS_KEY>",
"secretAccessKey": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "ls_keyvault",
"type": "LinkedServiceReference"
},
"secretName": "gcs-hmac-secret"
}
}
}
{
"name": "ls_adls_bronze",
"type": "AzureBlobFS",
"typeProperties": {
"url": "https://<STORAGE_ACCOUNT>.dfs.core.windows.net"
},
"connectVia": {
"type": "IntegrationRuntimeReference",
"referenceName": "AutoResolveIR"
}
}
3.2 Create Copy Activities¶
Replace ReadFromText with ADF Copy Activities:
{
"name": "copy_customers_csv",
"type": "Copy",
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "GoogleCloudStorageReadSettings",
"recursive": true,
"wildcardFileName": "*.csv"
},
"formatSettings": { "type": "DelimitedTextReadSettings" }
},
"sink": {
"type": "ParquetSink",
"storeSettings": { "type": "AzureBlobFSWriteSettings" }
}
},
"inputs": [
{
"referenceName": "ds_gcs_customers_csv",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "ds_adls_customers_parquet",
"type": "DatasetReference"
}
]
}
3.3 Pipeline flow¶
ADF Pipeline: pl_customer_orders_daily
├── copy_customers_csv (GCS CSV → ADLS Parquet)
├── copy_orders_csv (GCS CSV → ADLS Parquet)
└── run_dbt_transforms (depends on both copies)
└── dbt run --select stg_customers stg_orders fact_order_summary
Step 4: Convert Beam transforms to dbt SQL models¶
4.1 Map each Beam transform to a dbt model¶
parse_customer_csv + filter_valid_customers becomes stg_customers.sql:
-- models/staging/stg_customers.sql
{{ config(materialized='view') }}
with source as (
select * from {{ source('bronze', 'customers_raw') }}
),
cleaned as (
select
customer_id,
TRIM(LOWER(email)) AS email,
customer_name,
phone,
created_at
from source
where email IS NOT NULL -- Beam Filter: email is not None
)
select * from cleaned
parse_order_csv + filter_valid_orders becomes stg_orders.sql:
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
with source as (
select * from {{ source('bronze', 'orders_raw') }}
),
cleaned as (
select
order_id,
customer_id,
order_date,
CAST(amount AS DECIMAL(18, 2)) AS amount,
order_status,
created_at
from source
where amount > 0 -- Beam Filter: amount > 0
)
select * from cleaned
CoGroupByKey + Aggregate becomes fact_order_summary.sql:
-- models/gold/fact_order_summary.sql
{{ config(materialized='table') }}
with customers as (
select * from {{ ref('stg_customers') }}
),
orders as (
select * from {{ ref('stg_orders') }}
)
-- Replaces Beam CoGroupByKey + GroupByKey + compute_order_summary
select
c.customer_id,
c.email,
c.customer_name,
COUNT(o.order_id) AS total_orders,
SUM(o.amount) AS total_revenue,
AVG(o.amount) AS avg_order_value,
MIN(o.order_date) AS first_order_date,
MAX(o.order_date) AS last_order_date
from customers c
left join orders o
on c.customer_id = o.customer_id
group by
c.customer_id,
c.email,
c.customer_name
4.2 Add schema tests¶
# models/gold/schema.yml
version: 2
models:
- name: fact_order_summary
description: "Customer order summary replacing Beam pipeline output"
columns:
- name: customer_id
tests:
- not_null
- unique
- name: total_orders
tests:
- not_null
- name: total_revenue
tests:
- not_null
4.3 Run and validate¶
Step 5: For streaming -- set up Event Hubs + Stream Analytics¶
5.1 Migrate Pub/Sub to Event Hubs¶
Create an Event Hub matching the Pub/Sub topic:
# Create Event Hubs namespace
az eventhubs namespace create \
--resource-group rg-data-platform \
--name ehns-acme-streaming \
--sku Standard \
--capacity 2
# Create Event Hub (replaces Pub/Sub topic)
az eventhubs eventhub create \
--resource-group rg-data-platform \
--namespace-name ehns-acme-streaming \
--name eh-click-events \
--partition-count 8 \
--message-retention 7
5.2 Migrate event producers¶
Update producers to send to Event Hubs instead of Pub/Sub. Event Hubs supports the Kafka protocol, so Kafka-based producers only need a config change:
# Python producer using azure-eventhub SDK
from azure.eventhub import EventHubProducerClient, EventData
import json
producer = EventHubProducerClient.from_connection_string(
conn_str="Endpoint=sb://ehns-acme-streaming.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=...",
eventhub_name="eh-click-events"
)
event_batch = producer.create_batch()
event_batch.add(EventData(json.dumps({"page": "/home", "ts": "2025-01-15T10:30:00Z"})))
producer.send_batch(event_batch)
producer.close()
5.3 Create Stream Analytics job (replaces Beam streaming pipeline)¶
-- Stream Analytics query replacing Beam WindowInto + CountByPage
-- Input: eh-click-events (Event Hub)
-- Output: adls-pageview-counts (ADLS Gen2 or Fabric lakehouse)
SELECT
page,
COUNT(*) AS view_count,
System.Timestamp() AS window_end
INTO [adls-pageview-counts]
FROM [eh-click-events]
TIMESTAMP BY ts
GROUP BY
page,
TumblingWindow(minute, 5)
5.4 Map Beam windowing to Stream Analytics¶
| Beam window type | Stream Analytics function | SQL syntax |
|---|---|---|
FixedWindows(300) (5 min) | TumblingWindow | TumblingWindow(minute, 5) |
SlidingWindows(600, 60) (10 min, 1 min slide) | HoppingWindow | HoppingWindow(minute, 10, 1) |
Sessions(600) (10 min gap) | SessionWindow | SessionWindow(minute, 10) |
GlobalWindows() | No window (full aggregate) | Omit window function |
Late data with allowed_lateness | Watermark policy | TIMESTAMP BY col OVER col (Tolerance ...) |
5.5 Alternative: Databricks Structured Streaming¶
For complex stateful logic that Stream Analytics SQL cannot handle:
# Databricks notebook: streaming pageview counts
from pyspark.sql.functions import from_json, col, window, count
from pyspark.sql.types import StructType, StringType, TimestampType
schema = StructType() \
.add("page", StringType()) \
.add("ts", TimestampType())
# Read from Event Hubs
df = (spark.readStream
.format("eventhubs")
.options(**eh_conf)
.load()
.select(from_json(col("body").cast("string"), schema).alias("data"))
.select("data.*")
)
# 5-minute tumbling window (replaces Beam WindowInto)
windowed = (df
.withWatermark("ts", "2 minutes")
.groupBy(window("ts", "5 minutes"), "page")
.agg(count("*").alias("view_count"))
)
# Write to Delta Lake
(windowed.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/pageviews")
.toTable("acme_gov.analytics.pageview_counts_5min")
)
Step 6: Configure triggers and scheduling¶
6.1 Batch pipeline: ADF schedule trigger¶
{
"name": "tr_daily_0300_utc",
"properties": {
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2025-01-01T03:00:00Z",
"timeZone": "UTC"
}
},
"pipelines": [
{
"pipelineReference": {
"referenceName": "pl_customer_orders_daily",
"type": "PipelineReference"
}
}
]
}
}
6.2 Streaming pipeline: always-on¶
Stream Analytics jobs run continuously. Configure auto-start and monitoring:
# Start the Stream Analytics job
az stream-analytics job start \
--resource-group rg-data-platform \
--job-name asa-pageview-counts \
--output-start-mode JobStartTime
# Set up auto-restart on failure via Azure Monitor
az monitor metrics alert create \
--resource-group rg-data-platform \
--name alert-asa-failure \
--scopes /subscriptions/{sub-id}/resourceGroups/rg-data-platform/providers/Microsoft.StreamAnalytics/streamingjobs/asa-pageview-counts \
--condition "total Errors > 0" \
--window-size 5m \
--action ag-data-team
6.3 Scheduling mapping¶
| Dataflow / GCP scheduling | Azure equivalent |
|---|---|
| Cloud Scheduler + Dataflow template launch | ADF Schedule Trigger |
| Cloud Composer (Airflow) DAG | ADF Pipeline with dependencies |
| Dataflow streaming (always-on) | Stream Analytics job (always-on) or Databricks streaming job |
| Pub/Sub trigger → Cloud Function → Dataflow | Event Grid → ADF event-based trigger |
Manual gcloud dataflow jobs run | az datafactory pipeline create-run or ADF Studio "Trigger Now" |
Step 7: Validate output parity¶
7.1 Batch pipeline validation¶
Run both the Dataflow pipeline and the ADF + dbt pipeline on the same input data:
-- Compare row counts
SELECT 'azure' AS source, COUNT(*) AS rows FROM acme_gov.analytics.fact_order_summary
UNION ALL
SELECT 'gcp', COUNT(*) FROM `acme-gov.analytics.order_summary`;
-- Compare aggregates
SELECT 'azure' AS source,
SUM(total_revenue) AS total_rev,
AVG(avg_order_value) AS avg_order
FROM acme_gov.analytics.fact_order_summary
UNION ALL
SELECT 'gcp',
SUM(total_revenue),
AVG(avg_order)
FROM `acme-gov.analytics.order_summary`;
7.2 Streaming pipeline validation¶
For streaming, compare windowed output over a test period:
- Send identical test events to both Pub/Sub and Event Hubs
- Wait for 3-5 window cycles (15-25 minutes for 5-minute windows)
- Compare output row counts and aggregates per window
- Tolerance: < 1% variance (streaming systems may differ on window boundaries for late data)
7.3 Reconciliation checklist¶
- Batch: row counts match exactly
- Batch: aggregate values match within 0.01% tolerance
- Batch: scheduling fires at correct time
- Streaming: windowed counts match within 1% tolerance
- Streaming: late-data handling behaves equivalently
- Streaming: throughput meets latency SLA (< 1 minute end-to-end)
- Error handling: failed records route to dead-letter path
- Monitoring: alerts fire on pipeline/job failure
Beam transform to dbt/ADF mapping reference¶
| Beam transform | Azure equivalent | Implementation |
|---|---|---|
ReadFromText(path) | ADF Copy Activity | Copy from GCS/ADLS to bronze layer |
ReadFromAvro(path) | ADF Copy Activity (Avro format) | Copy Avro files preserving schema |
ReadFromPubSub(topic) | Event Hubs consumer | Kafka protocol or native SDK |
WriteToBigQuery(table) | dbt model or ADF Copy (sink) | Write to Delta table |
WriteToText(path) | ADF Copy Activity (sink) | Write to ADLS as CSV/JSON |
Map(fn) | dbt SQL expression | SELECT fn_logic FROM ... |
FlatMap(fn) | dbt LATERAL VIEW EXPLODE | Unnest arrays |
Filter(fn) | dbt WHERE clause | WHERE condition |
GroupByKey() | dbt GROUP BY | GROUP BY key_columns |
CoGroupByKey() | dbt JOIN | LEFT JOIN / INNER JOIN on keys |
Combine.globally(sum) | dbt SUM() | Aggregate without GROUP BY |
Combine.perKey(sum) | dbt SUM() ... GROUP BY | Aggregate with GROUP BY |
Flatten() | dbt UNION ALL | Combine multiple sources |
Partition(fn) | dbt CASE WHEN + separate models | Route rows to different targets |
WindowInto(FixedWindows) | Stream Analytics TumblingWindow | SQL windowing function |
WindowInto(SlidingWindows) | Stream Analytics HoppingWindow | Sliding window aggregation |
WindowInto(Sessions) | Stream Analytics SessionWindow | Session-based grouping |
Side input (AsDict) | dbt ref() + JOIN | Join to reference table |
Reshuffle() | No equivalent needed | Databricks handles parallelism automatically |
Custom DoFn with state | Databricks Structured Streaming mapGroupsWithState | For stateful streaming logic |
Next steps¶
After completing this tutorial:
- Migrate remaining Dataflow pipelines. Apply the batch or streaming pattern to each pipeline in your inventory.
- Set up monitoring dashboards. Create Azure Monitor workbooks for ADF pipeline and Stream Analytics job health.
- Implement error handling. Configure dead-letter Event Hubs and ADF failure paths for production resilience.
- Review the playbook. See GCP to Azure Migration Playbook for the full phased plan covering Dataproc and other GCP services.
Last updated: 2026-04-30 Maintainers: CSA-in-a-Box core team Related: Migration Playbook | BigQuery to Fabric Tutorial | Benchmarks | Best Practices