Data Engineering Best Practices¶
Overview¶
CSA-in-a-Box uses a modern data engineering stack built on three pillars:
| Component | Role | Key Strength |
|---|---|---|
| dbt | Transformation layer | SQL-based modeling, testing, documentation |
| Azure Data Factory (ADF) | Orchestration & ingestion | Cloud-native ETL, 90+ connectors |
| Apache Spark | Large-scale processing | Distributed compute for big data workloads |
Together these tools implement the medallion architecture — ingesting raw data, transforming it through bronze → silver → gold layers, and serving analytics-ready datasets.
flowchart LR
Sources["Data Sources"] -->|ADF Ingestion| Bronze
Bronze -->|dbt transform| Silver
Silver -->|dbt transform| Gold
Gold --> Analytics["BI / Analytics"]
Gold --> ML["ML Workloads"]
subgraph "Spark Cluster"
Bronze["Bronze\n(Raw)"]
Silver["Silver\n(Cleansed)"]
Gold["Gold\n(Business)"]
end dbt Best Practices¶
Project Structure¶
Organize dbt projects by domain with a shared layer for cross-cutting models:
dbt_project/
├── dbt_project.yml
├── packages.yml
├── macros/
│ ├── generic_tests/
│ │ └── test_not_negative.sql
│ ├── utils/
│ │ └── generate_surrogate_key.sql
│ └── materializations/
├── seeds/
│ ├── dim_country_codes.csv
│ └── schema.yml
├── models/
│ ├── bronze/
│ │ ├── finance/
│ │ │ ├── brz_finance__transactions.sql
│ │ │ └── schema.yml
│ │ └── hr/
│ │ ├── brz_hr__employees.sql
│ │ └── schema.yml
│ ├── silver/
│ │ ├── finance/
│ │ │ ├── slv_finance__transactions_cleansed.sql
│ │ │ └── schema.yml
│ │ └── hr/
│ │ ├── slv_hr__employees_cleansed.sql
│ │ └── schema.yml
│ ├── gold/
│ │ ├── finance/
│ │ │ ├── gld_finance__monthly_revenue.sql
│ │ │ └── schema.yml
│ │ └── shared/
│ │ ├── gld_shared__dim_date.sql
│ │ └── schema.yml
│ └── staging/
│ └── _sources.yml
└── tests/
└── data_tests/
└── assert_revenue_positive.sql
Model Naming Conventions¶
| Layer | Prefix | Example | Materialization |
|---|---|---|---|
| Bronze | brz_ | brz_finance__transactions | Incremental |
| Silver | slv_ | slv_finance__transactions_cleansed | Incremental |
| Gold | gld_ | gld_finance__monthly_revenue | Table |
| Staging | stg_ | stg_finance__raw_transactions | Ephemeral / View |
Double-underscore convention
Use {prefix}_{domain}__{entity} — the double underscore separates the domain from the entity name, making lineage easier to parse.
Incremental Models¶
Use incremental models when:
- The source table is large (>1M rows)
- Data arrives in append-only or slowly-changing patterns
- Full refreshes are too expensive or slow
-- models/silver/finance/slv_finance__transactions_cleansed.sql
{{
config(
materialized='incremental',
unique_key='transaction_id',
incremental_strategy='merge',
on_schema_change='sync_all_columns'
)
}}
with source as (
select * from {{ ref('brz_finance__transactions') }}
{% if is_incremental() %}
where _loaded_at > (select max(_loaded_at) from {{ this }})
{% endif %}
),
cleansed as (
select
transaction_id,
customer_id,
cast(amount as decimal(18, 2)) as amount,
upper(trim(currency_code)) as currency_code,
cast(transaction_date as date) as transaction_date,
current_timestamp() as _transformed_at
from source
where transaction_id is not null
and amount is not null
)
select * from cleansed
!!! warning "Incremental pitfalls" - Always define unique_key for merge strategies to avoid duplicates. - Test with dbt run --full-refresh periodically to ensure correctness. - Late-arriving data can be missed — consider a lookback window: sql where _loaded_at > dateadd(day, -3, (select max(_loaded_at) from {{ this }}))
Materialization Decision Tree¶
flowchart TD
A["New Model"] --> B{"Referenced by\nother models only?"}
B -->|Yes| C["Ephemeral"]
B -->|No| D{"Large dataset\nor expensive query?"}
D -->|No| E["View"]
D -->|Yes| F{"Append-only or\nincremental source?"}
F -->|Yes| G["Incremental"]
F -->|No| H["Table"] | Materialization | Use When | Avoid When |
|---|---|---|
| View | Small datasets, real-time freshness needed | Query is expensive or slow |
| Table | Moderate size, full refresh is acceptable | Data is append-only and huge |
| Incremental | Large tables with identifiable new rows | Logic is complex and hard to partition |
| Ephemeral | Intermediate CTEs shared across models | End users query it directly |
Testing¶
Schema Tests¶
# models/silver/finance/schema.yml
version: 2
models:
- name: slv_finance__transactions_cleansed
description: "Cleansed financial transactions with validated types"
columns:
- name: transaction_id
description: "Primary key"
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- name: currency_code
tests:
- accepted_values:
values: ["USD", "EUR", "GBP", "CAD", "AUD"]
Custom Generic Tests¶
-- macros/generic_tests/test_not_negative.sql
{% test not_negative(model, column_name) %}
select {{ column_name }}
from {{ model }}
where {{ column_name }} < 0
{% endtest %}
Freshness Checks¶
# models/staging/_sources.yml
version: 2
sources:
- name: raw_finance
database: raw_db
schema: finance
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
loaded_at_field: _loaded_at
tables:
- name: transactions
- name: accounts
Run freshness checks: dbt source freshness
Documentation¶
-- models/gold/finance/gld_finance__monthly_revenue.sql
{{
config(
materialized='table',
description='Monthly revenue aggregation by business unit'
)
}}
# models/gold/finance/schema.yml
version: 2
models:
- name: gld_finance__monthly_revenue
description: >
{{ doc("monthly_revenue_description") }}
columns:
- name: revenue_month
description: "First day of the month (date grain)"
- name: business_unit
description: "Business unit identifier from the HR domain"
- name: total_revenue
description: "Sum of all transaction amounts for the period"
<!-- models/gold/finance/docs.md -->
{% docs monthly_revenue_description %}
Monthly revenue by business unit, sourced from cleansed transactions.
Used by the executive dashboard and finance reporting.
**Owner:** Finance Analytics Team
**SLA:** Refreshed daily by 06:00 UTC
{% enddocs %}
Macros and Packages¶
# packages.yml
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
- package: calogica/dbt_expectations
version: [">=0.10.0", "<1.0.0"]
Example reusable macro:
-- macros/utils/cents_to_dollars.sql
{% macro cents_to_dollars(column_name) %}
cast({{ column_name }} as decimal(18, 2)) / 100.0
{% endmacro %}
-- Usage in a model:
-- select {{ cents_to_dollars('amount_cents') }} as amount_dollars
Seeds¶
Seeds: when to use
Use for: Small, static lookup/reference tables (<1,000 rows) — country codes, status enums, currency mappings. Don't use for: Large datasets, frequently changing data, or anything over a few thousand rows.
-- seeds/dim_country_codes.csv
country_code,country_name,region
US,United States,North America
GB,United Kingdom,Europe
CA,Canada,North America
# seeds/schema.yml
version: 2
seeds:
- name: dim_country_codes
description: "ISO country code lookup"
config:
column_types:
country_code: varchar(3)
country_name: varchar(100)
region: varchar(50)
ADF Pipeline Best Practices¶
Naming Conventions¶
| Resource | Pattern | Example |
|---|---|---|
| Pipeline | pl_{domain}_{source}_{frequency} | pl_finance_sap_daily |
| Dataset | ds_{format}_{system}_{entity} | ds_parquet_datalake_transactions |
| Linked Service | ls_{type}_{environment} | ls_adls_prod |
| Trigger | tr_{type}_{schedule} | tr_schedule_daily_0600 |
| Activity | act_{verb}_{entity} | act_copy_transactions |
| Data Flow | df_{domain}_{purpose} | df_finance_cleanse_transactions |
Parameterized Pipelines¶
Design pipelines for multi-environment promotion — dev → staging → prod without code changes:
{
"name": "pl_finance_sap_daily",
"properties": {
"parameters": {
"environment": { "type": "string", "defaultValue": "dev" },
"source_schema": { "type": "string" },
"sink_container": { "type": "string", "defaultValue": "bronze" },
"load_date": {
"type": "string",
"defaultValue": "@utcnow('yyyy-MM-dd')"
}
}
}
}
Environment promotion
Use ARM template parameters or Terraform variables to swap linked service endpoints between environments. Never hard-code connection strings.
Error Handling¶
flowchart TD
A["Pipeline Start"] --> B["Copy Activity"]
B -->|Success| C["Transform"]
B -->|Failure| D{"Retry\n(3x, 30s)"}
D -->|Exhausted| E["Log to Dead Letter"]
E --> F["Send Alert"]
C -->|Success| G["Pipeline Complete"]
C -->|Failure| D2{"Retry\n(2x, 60s)"}
D2 -->|Exhausted| E - Set retry policies on every activity:
"retry": 3, "retryIntervalInSeconds": 30 - Use Upon Failure dependencies to route to dead-letter and alerting activities
- Log failed records to a dead-letter table/container for reprocessing
- Send alerts via Logic Apps or Azure Monitor Action Groups
Trigger Types¶
| Trigger Type | Use When | Example |
|---|---|---|
| Schedule | Fixed cadence (daily, hourly) | Daily 6 AM batch load |
| Tumbling Window | Need backfill, exactly-once, dependency chains | Hourly partitioned loads |
| Event-based | React to file arrival (Blob/ADLS) | Process file on landing |
Prefer tumbling window over schedule
Tumbling window triggers support backfill, concurrency control, and dependency chaining — schedule triggers do not.
Do / Don't¶
| ✅ Do | ❌ Don't |
|---|---|
| Use Managed Identity for linked services | Use connection strings or keys |
| Parameterize environment-specific values | Hard-code server names or paths |
| Set retry and timeout on every activity | Leave default (no retry) settings |
| Use tumbling window for partitioned loads | Use schedule when you need backfill |
| Monitor with Azure Monitor alerts | Rely on manual pipeline run checks |
| Version control ARM templates / Terraform | Make changes only through the portal |
| Use metadata-driven frameworks for similar pipelines | Copy-paste pipelines with minor differences |
| Log pipeline metrics to Log Analytics | Ignore execution statistics |
Spark Optimization¶
Cluster Sizing¶
!!! tip "Right-size your clusters" - Start small, scale up based on metrics — don't over-provision. - Use auto-scaling with min/max workers defined. - Match worker VM size to workload: memory-optimized for joins, compute-optimized for transformations.
# Databricks cluster config example
{
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"spark_conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
Partition Management¶
# ✅ GOOD — reduce partitions for writing small outputs
df_small = df.coalesce(4) # Avoids shuffle, just merges partitions
df_small.write.parquet("/output/small_table")
# ✅ GOOD — repartition by key for downstream joins
df_partitioned = df.repartition(200, "customer_id")
# ❌ BAD — repartition when coalesce would suffice
df_small = df.repartition(4) # Unnecessary full shuffle
| Operation | Shuffle? | Use When |
|---|---|---|
coalesce(n) | No | Reducing partitions (writing output) |
repartition(n) | Yes | Even distribution needed (before joins) |
repartition(n, col) | Yes | Partition by key for co-located joins |
Broadcast Joins¶
from pyspark.sql.functions import broadcast
# ✅ GOOD — small dimension table broadcast to all workers
result = large_fact_df.join(
broadcast(small_dim_df),
on="dim_key",
how="left"
)
# Rule of thumb: broadcast tables < 100MB
# Check with: spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)
Caching Strategies¶
from pyspark import StorageLevel
# ✅ cache() — keep in memory (default MEMORY_AND_DISK)
df_reused = df.filter(df.status == "active").cache()
df_reused.count() # Materialize the cache
# ... use df_reused multiple times ...
df_reused.unpersist() # Release when done
# ✅ persist() — control storage level
df_large = df.persist(StorageLevel.DISK_ONLY) # When memory is constrained
!!! warning "Cache management" - Always call .unpersist() when done — leaked caches waste cluster memory. - Only cache DataFrames used multiple times in the same job. - Don't cache DataFrames that are only read once.
Avoiding Shuffles¶
# ✅ Salting to fix data skew
from pyspark.sql.functions import lit, rand, ceil, col, explode, array
SALT_BUCKETS = 10
# Salt the skewed key
skewed_df = skewed_df.withColumn("salt", ceil(rand() * SALT_BUCKETS).cast("int"))
# Explode the small table to match all salt values
small_df = small_df.withColumn(
"salt", explode(array([lit(i) for i in range(1, SALT_BUCKETS + 1)]))
)
result = skewed_df.join(small_df, on=["join_key", "salt"], how="inner").drop("salt")
# ✅ Bucketing for repeated joins on the same key
df.write.bucketBy(256, "customer_id").sortBy("customer_id").saveAsTable("bucketed_customers")
UDF Anti-Patterns¶
| ✅ Do | ❌ Don't |
|---|---|
F.upper(col("name")) | udf(lambda x: x.upper()) |
F.when(cond, val).otherwise(other) | udf(lambda x: val if cond else other) |
F.regexp_replace(col, pattern, repl) | udf(lambda x: re.sub(pattern, repl, x)) |
F.from_json(col, schema) | udf(lambda x: json.loads(x)) |
# ❌ BAD — Python UDF (serialization overhead, no Catalyst optimization)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def clean_name(name):
return name.strip().upper() if name else None
df = df.withColumn("clean_name", clean_name(df.name))
# ✅ GOOD — built-in functions (runs in JVM, Catalyst-optimized)
from pyspark.sql import functions as F
df = df.withColumn("clean_name", F.upper(F.trim(F.col("name"))))
Data Contracts¶
What Are Data Contracts?¶
A data contract is a formal agreement between a data producer and its consumers that defines the schema, quality guarantees, SLAs, and ownership of a dataset. They prevent breaking changes from silently propagating downstream.
Contract YAML Structure¶
# contracts/finance/transactions.yml
contract:
name: finance_transactions
version: "2.1"
owner: finance-data-team
contact: finance-data@company.com
description: >
Cleansed financial transactions from SAP.
Updated daily by 06:00 UTC.
sla:
freshness: "24 hours"
availability: "99.5%"
update_frequency: "daily"
schema:
- name: transaction_id
type: string
required: true
primary_key: true
description: "Unique transaction identifier"
- name: amount
type: decimal(18,2)
required: true
constraints:
- type: range
min: 0
max: 10000000
- name: currency_code
type: string
required: true
constraints:
- type: enum
values: [USD, EUR, GBP, CAD, AUD]
- name: transaction_date
type: date
required: true
constraints:
- type: not_future
tests:
- unique: [transaction_id]
- not_null: [transaction_id, amount, currency_code, transaction_date]
- row_count_min: 1000
- freshness:
column: _loaded_at
warn_after: 12h
error_after: 24h
Enforcement via dbt¶
Generate dbt schema tests from contract definitions:
# Generated from contract: contracts/finance/transactions.yml
# models/silver/finance/schema.yml
version: 2
models:
- name: slv_finance__transactions_cleansed
description: "Cleansed financial transactions — contract v2.1"
config:
contract:
enforced: true
columns:
- name: transaction_id
data_type: varchar
constraints:
- type: not_null
- type: primary_key
tests:
- unique
- not_null
- name: amount
data_type: decimal(18,2)
constraints:
- type: not_null
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 10000000
- name: currency_code
data_type: varchar
constraints:
- type: not_null
tests:
- accepted_values:
values: ["USD", "EUR", "GBP", "CAD", "AUD"]
Breaking Change Management¶
| Change Type | Breaking? | Action Required |
|---|---|---|
| Add nullable column | No | Update contract version (minor) |
| Remove column | Yes | Deprecation notice → 2 sprint grace period → remove |
| Change data type | Yes | New contract version (major), notify consumers |
| Tighten constraint | Yes | Communicate and validate downstream impact |
| Loosen constraint | No | Update contract version (minor) |
| Rename column | Yes | Add alias, deprecate old name, remove after grace period |
Testing Strategy¶
Testing Pyramid¶
graph TB
E2E["E2E / Pipeline Tests\n(Full pipeline run, integration)"]
Integration["Integration Tests\n(dbt compile + run, Spark job tests)"]
Unit["Unit Tests\n(dbt unit tests, PySpark unit tests)"]
E2E --> Integration --> Unit
style Unit fill:#4caf50,color:#fff
style Integration fill:#ff9800,color:#fff
style E2E fill:#f44336,color:#fff dbt Unit Tests¶
# models/silver/finance/schema.yml
unit_tests:
- name: test_cents_conversion
model: slv_finance__transactions_cleansed
given:
- input: ref('brz_finance__transactions')
rows:
- {
transaction_id: "T001",
amount: 1500,
currency_code: "usd",
transaction_date: "2024-01-15",
}
- {
transaction_id: "T002",
amount: null,
currency_code: "EUR",
transaction_date: "2024-01-15",
}
expect:
rows:
- {
transaction_id: "T001",
amount: 1500.00,
currency_code: "USD",
}
# T002 excluded due to null amount filter
Data Quality with dbt-expectations¶
models:
- name: gld_finance__monthly_revenue
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1
max_value: 1000000
- dbt_expectations.expect_column_values_to_match_regex:
column_name: business_unit
regex: "^BU-[A-Z]{3,5}$"
- dbt_expectations.expect_column_pair_values_A_to_be_greater_than_B:
column_A: total_revenue
column_B: total_cost
or_equal: true
CI Pipeline Testing¶
# .azure-pipelines/dbt-ci.yml
stages:
- stage: dbt_test
jobs:
- job: lint_and_test
steps:
- script: |
pip install dbt-core dbt-sqlserver
dbt deps
dbt compile --target ci
dbt test --target ci --select state:modified+
displayName: "dbt compile and test modified models"
- script: |
dbt source freshness --target ci
displayName: "Check source freshness"
Slim CI
Use state:modified+ to only test models that changed in the PR, plus their downstream dependents. This keeps CI fast.
Anti-Patterns¶
Never SELECT _ in production models
Always explicitly list columns. SELECT _ breaks data contracts when upstream schemas change and hides column-level lineage.
```sql
-- ❌ BAD
select * from {{ ref('brz_finance__transactions') }}
-- ✅ GOOD
select transaction_id, amount, currency_code, transaction_date
from {{ ref('brz_finance__transactions') }}
```
Never skip testing in CI
Every PR that modifies a dbt model must run dbt test on modified models and their dependents. Untested changes will eventually cause silent data corruption.
Never hard-code environment values in ADF
Hard-coded server names, storage accounts, or paths make promotion impossible and cause production incidents when dev resources are referenced.
Never use Python UDFs for simple transformations in Spark
Python UDFs serialize data from JVM → Python → JVM, adding 10-100x overhead. Use built-in Spark SQL functions for any operation that has a native equivalent.
Never ignore data skew in Spark joins
A single skewed key can cause one task to process 90% of the data while others sit idle. Monitor stage metrics and apply salting or broadcast joins when skew is detected.
Never deploy dbt models without freshness checks on sources
Without freshness monitoring, stale source data flows silently through the pipeline, producing outdated reports that stakeholders trust as current.
Checklist: Data Engineering Readiness¶
- dbt project follows
brz_/slv_/gld_naming conventions - All models have schema tests (unique, not_null at minimum)
- Incremental models define
unique_keyand handle late-arriving data - Source freshness checks are configured and monitored
- Data contracts exist for all gold-layer models
- ADF pipelines are parameterized for multi-environment promotion
- ADF linked services use Managed Identity
- ADF activities have retry policies and failure alerting
- Spark jobs use built-in functions instead of Python UDFs
- Spark cluster auto-scaling is configured with appropriate min/max
- CI pipeline runs
dbt test --select state:modified+on PRs - Documentation exists for all gold-layer models (
doc()blocks) - Breaking changes follow deprecation process (2-sprint grace period)
Cross-References¶
- Medallion Architecture — Bronze/Silver/Gold layer design
- Data Governance — Classification, access control, lineage
- Monitoring & Observability — Pipeline alerting, data quality dashboards
- Security Best Practices — Managed Identity, key vault integration