🎯 Event-Driven Data Pipelines - Automated ETL with Event Grid¶
Build event-driven Data Factory pipelines triggered by Event Grid for automated, serverless ETL workflows.
🎯 Overview¶
Create serverless, event-driven data pipelines that automatically process files as they arrive in storage, triggered by Event Grid events.
Architecture¶
graph TB
Upload[File Upload] --> Blob[Blob Storage<br/>Landing Zone]
Blob --> EventGrid[Event Grid<br/>Blob Events]
EventGrid --> DataFactory[Data Factory<br/>Event Trigger]
DataFactory --> Process[Data Processing<br/>Pipeline]
Process --> Transform[Transform Data]
Transform --> Destination[Destination<br/>Storage/DB]
Process --> Notify[Notifications<br/>Success/Failure] 🚀 Implementation¶
Step 1: Create Storage Accounts¶
RESOURCE_GROUP="rg-event-driven-etl"
LOCATION="eastus"
LANDING_STORAGE="landlanding$(openssl rand -hex 4)"
PROCESSED_STORAGE="landprocessed$(openssl rand -hex 4)"
# Create resource group
az group create --name $RESOURCE_GROUP --location $LOCATION
# Create landing zone storage
az storage account create \
--name $LANDING_STORAGE \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--sku Standard_LRS \
--kind StorageV2 \
--enable-hierarchical-namespace true
# Create processed storage
az storage account create \
--name $PROCESSED_STORAGE \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--sku Standard_LRS \
--kind StorageV2 \
--enable-hierarchical-namespace true
# Create containers
az storage container create \
--name "landing" \
--account-name $LANDING_STORAGE \
--auth-mode login
az storage container create \
--name "processed" \
--account-name $PROCESSED_STORAGE \
--auth-mode login
az storage container create \
--name "archive" \
--account-name $LANDING_STORAGE \
--auth-mode login
az storage container create \
--name "errors" \
--account-name $LANDING_STORAGE \
--auth-mode login
Step 2: Create Data Factory¶
DATA_FACTORY="adf-event-driven-$(openssl rand -hex 4)"
# Create Data Factory
az datafactory create \
--name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--location $LOCATION
# Enable managed identity
az datafactory update \
--name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--identity SystemAssigned
# Get managed identity principal ID
PRINCIPAL_ID=$(az datafactory show \
--name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--query identity.principalId \
--output tsv)
# Grant storage access to Data Factory
LANDING_STORAGE_ID=$(az storage account show \
--name $LANDING_STORAGE \
--resource-group $RESOURCE_GROUP \
--query id \
--output tsv)
PROCESSED_STORAGE_ID=$(az storage account show \
--name $PROCESSED_STORAGE \
--resource-group $RESOURCE_GROUP \
--query id \
--output tsv)
az role assignment create \
--assignee $PRINCIPAL_ID \
--role "Storage Blob Data Contributor" \
--scope $LANDING_STORAGE_ID
az role assignment create \
--assignee $PRINCIPAL_ID \
--role "Storage Blob Data Contributor" \
--scope $PROCESSED_STORAGE_ID
Step 3: Create Event Grid System Topic¶
EVENT_GRID_TOPIC="egt-blob-events-$(openssl rand -hex 4)"
# Create Event Grid system topic for blob events
az eventgrid system-topic create \
--name $EVENT_GRID_TOPIC \
--resource-group $RESOURCE_GROUP \
--location $LOCATION \
--topic-type Microsoft.Storage.StorageAccounts \
--source $LANDING_STORAGE_ID
Step 4: Create Data Factory Pipeline¶
Create a pipeline JSON file event-driven-pipeline.json:
{
"name": "BlobEventPipeline",
"properties": {
"activities": [
{
"name": "ValidateFile",
"type": "Validation",
"typeProperties": {
"dataset": {
"referenceName": "LandingDataset",
"type": "DatasetReference",
"parameters": {
"fileName": "@triggerBody().fileName",
"folderPath": "@triggerBody().folderPath"
}
},
"timeout": "0.00:05:00",
"sleep": 10,
"minimumSize": 1
}
},
{
"name": "CheckFileType",
"type": "Switch",
"dependsOn": [
{
"activity": "ValidateFile",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"on": "@substring(triggerBody().fileName, add(lastIndexOf(triggerBody().fileName, '.'), 1), 10)",
"cases": [
{
"value": "csv",
"activities": [
{
"name": "ProcessCSV",
"type": "Copy",
"typeProperties": {
"source": {
"type": "DelimitedTextSource"
},
"sink": {
"type": "ParquetSink"
}
}
}
]
},
{
"value": "json",
"activities": [
{
"name": "ProcessJSON",
"type": "Copy",
"typeProperties": {
"source": {
"type": "JsonSource"
},
"sink": {
"type": "ParquetSink"
}
}
}
]
}
],
"defaultActivities": [
{
"name": "MoveToErrors",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BinarySource"
},
"sink": {
"type": "BinarySink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings",
"copyBehavior": "PreserveHierarchy"
}
}
}
}
]
}
},
{
"name": "ArchiveFile",
"type": "Copy",
"dependsOn": [
{
"activity": "CheckFileType",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"source": {
"type": "BinarySource",
"storeSettings": {
"type": "AzureBlobFSReadSettings"
}
},
"sink": {
"type": "BinarySink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
}
}
}
},
{
"name": "DeleteOriginalFile",
"type": "Delete",
"dependsOn": [
{
"activity": "ArchiveFile",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"dataset": {
"referenceName": "LandingDataset",
"type": "DatasetReference"
}
}
}
],
"parameters": {
"fileName": {
"type": "String"
},
"folderPath": {
"type": "String"
}
}
}
}
Step 5: Create Event-Based Trigger¶
{
"name": "BlobCreatedTrigger",
"properties": {
"type": "BlobEventsTrigger",
"typeProperties": {
"blobPathBeginsWith": "/landing/blobs/",
"blobPathEndsWith": ".csv",
"ignoreEmptyBlobs": true,
"scope": "[resourceId('Microsoft.Storage/storageAccounts', parameters('landingStorageAccount'))]",
"events": [
"Microsoft.Storage.BlobCreated"
]
},
"pipelines": [
{
"pipelineReference": {
"referenceName": "BlobEventPipeline",
"type": "PipelineReference"
},
"parameters": {
"fileName": "@triggerBody().fileName",
"folderPath": "@triggerBody().folderPath"
}
}
]
}
}
🎨 Advanced Patterns¶
Pattern 1: Multi-Stage Processing¶
{
"activities": [
{
"name": "Stage1_Validation",
"type": "Validation"
},
{
"name": "Stage2_Transform",
"type": "Copy",
"dependsOn": [{"activity": "Stage1_Validation"}]
},
{
"name": "Stage3_Enrich",
"type": "Lookup",
"dependsOn": [{"activity": "Stage2_Transform"}]
},
{
"name": "Stage4_Load",
"type": "SqlServerStoredProcedure",
"dependsOn": [{"activity": "Stage3_Enrich"}]
},
{
"name": "Stage5_Cleanup",
"type": "Delete",
"dependsOn": [{"activity": "Stage4_Load"}]
}
]
}
Pattern 2: Parallel Processing¶
{
"activities": [
{
"name": "ValidateFile",
"type": "Validation"
},
{
"name": "ProcessDataFlow1",
"type": "ExecuteDataFlow",
"dependsOn": [{"activity": "ValidateFile"}]
},
{
"name": "ProcessDataFlow2",
"type": "ExecuteDataFlow",
"dependsOn": [{"activity": "ValidateFile"}]
},
{
"name": "WaitForCompletion",
"type": "Wait",
"dependsOn": [
{"activity": "ProcessDataFlow1"},
{"activity": "ProcessDataFlow2"}
],
"typeProperties": {
"waitTimeInSeconds": 1
}
}
]
}
Pattern 3: Error Handling¶
{
"activities": [
{
"name": "TryProcessFile",
"type": "Copy"
},
{
"name": "HandleSuccess",
"type": "Copy",
"dependsOn": [
{
"activity": "TryProcessFile",
"dependencyConditions": ["Succeeded"]
}
]
},
{
"name": "HandleFailure",
"type": "Copy",
"dependsOn": [
{
"activity": "TryProcessFile",
"dependencyConditions": ["Failed"]
}
],
"typeProperties": {
"source": {
"type": "BinarySource"
},
"sink": {
"type": "BinarySink",
"storeSettings": {
"folderPath": "errors"
}
}
}
},
{
"name": "SendErrorNotification",
"type": "WebActivity",
"dependsOn": [
{
"activity": "HandleFailure",
"dependencyConditions": ["Completed"]
}
],
"typeProperties": {
"url": "https://logic-app-url",
"method": "POST",
"body": {
"fileName": "@pipeline().parameters.fileName",
"error": "@activity('TryProcessFile').error.message"
}
}
}
]
}
🔔 Event Grid Filters¶
Filter by File Extension¶
{
"filter": {
"subjectBeginsWith": "/blobServices/default/containers/landing/blobs/",
"subjectEndsWith": ".csv",
"includedEventTypes": [
"Microsoft.Storage.BlobCreated"
]
}
}
Advanced Filtering¶
{
"filter": {
"advancedFilters": [
{
"operatorType": "StringEndsWith",
"key": "subject",
"values": [".csv", ".json", ".parquet"]
},
{
"operatorType": "NumberGreaterThan",
"key": "data.contentLength",
"value": 1024
},
{
"operatorType": "StringContains",
"key": "data.url",
"values": ["/landing/"]
}
]
}
}
📊 Complete Bicep Template¶
param location string = resourceGroup().location
param namePrefix string = 'eventdriven'
// Landing storage account
resource landingStorage 'Microsoft.Storage/storageAccounts@2023-01-01' = {
name: '${namePrefix}land${uniqueString(resourceGroup().id)}'
location: location
sku: {
name: 'Standard_LRS'
}
kind: 'StorageV2'
properties: {
isHnsEnabled: true
}
}
// Containers
resource landingContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2023-01-01' = {
name: '${landingStorage.name}/default/landing'
properties: {
publicAccess: 'None'
}
}
resource processedContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2023-01-01' = {
name: '${landingStorage.name}/default/processed'
properties: {
publicAccess: 'None'
}
}
resource archiveContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2023-01-01' = {
name: '${landingStorage.name}/default/archive'
properties: {
publicAccess: 'None'
}
}
resource errorContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2023-01-01' = {
name: '${landingStorage.name}/default/errors'
properties: {
publicAccess: 'None'
}
}
// Data Factory
resource dataFactory 'Microsoft.DataFactory/factories@2018-06-01' = {
name: '${namePrefix}-adf-${uniqueString(resourceGroup().id)}'
location: location
identity: {
type: 'SystemAssigned'
}
}
// Event Grid System Topic
resource eventGridTopic 'Microsoft.EventGrid/systemTopics@2023-12-15-preview' = {
name: '${namePrefix}-egt-${uniqueString(resourceGroup().id)}'
location: location
properties: {
source: landingStorage.id
topicType: 'Microsoft.Storage.StorageAccounts'
}
}
// Role assignment for Data Factory
resource roleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(landingStorage.id, dataFactory.id, 'StorageBlobDataContributor')
scope: landingStorage
properties: {
roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'ba92f5b4-2d11-453d-a403-e96b0029c9fe')
principalId: dataFactory.identity.principalId
principalType: 'ServicePrincipal'
}
}
output dataFactoryName string = dataFactory.name
output landingStorageName string = landingStorage.name
output eventGridTopicName string = eventGridTopic.name
🧪 Testing the Pipeline¶
Upload Test File¶
# Create test CSV file
cat > test-data.csv << EOF
id,name,value,timestamp
1,device-001,25.5,2024-01-28T10:00:00Z
2,device-002,26.3,2024-01-28T10:01:00Z
3,device-003,24.8,2024-01-28T10:02:00Z
EOF
# Upload to landing zone
az storage blob upload \
--account-name $LANDING_STORAGE \
--container-name landing \
--name "test-data.csv" \
--file test-data.csv \
--auth-mode login
# Monitor pipeline execution
az datafactory pipeline-run query-by-factory \
--factory-name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--last-updated-after "2024-01-28T00:00:00Z" \
--last-updated-before "2024-01-28T23:59:59Z"
Monitor Event Grid Events¶
# Create event subscription for monitoring
az eventgrid system-topic event-subscription create \
--name "monitor-subscription" \
--system-topic-name $EVENT_GRID_TOPIC \
--resource-group $RESOURCE_GROUP \
--endpoint-type webhook \
--endpoint "https://your-webhook-url"
📈 Monitoring and Alerting¶
Data Factory Monitoring¶
# Get pipeline runs
az datafactory pipeline-run query-by-factory \
--factory-name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--filters operand="PipelineName" operator="Equals" values="BlobEventPipeline" \
--last-updated-after "2024-01-28T00:00:00Z"
# Get activity runs
az datafactory activity-run query-by-pipeline-run \
--factory-name $DATA_FACTORY \
--resource-group $RESOURCE_GROUP \
--run-id "<pipeline-run-id>" \
--last-updated-after "2024-01-28T00:00:00Z"
Create Alerts¶
# Alert on pipeline failures
az monitor metrics alert create \
--name "PipelineFailureAlert" \
--resource-group $RESOURCE_GROUP \
--scopes $(az datafactory show --name $DATA_FACTORY --resource-group $RESOURCE_GROUP --query id --output tsv) \
--condition "count PipelineFailedRuns > 0" \
--window-size 5m \
--evaluation-frequency 1m \
--action <action-group-id>
💰 Cost Optimization¶
Event Grid¶
- Use advanced filters to reduce unnecessary events
- Implement dead letter queues for failed events
- Use system topics instead of custom topics where possible
Data Factory¶
- Use integration runtime auto-resolve for optimal performance
- Implement pipeline caching for repeated operations
- Minimize activity count by combining operations
- Use tumbling window triggers instead of schedule triggers for batch workloads
🔒 Security Best Practices¶
Network Security¶
# Enable private endpoints for storage
az network private-endpoint create \
--name "pe-storage-landing" \
--resource-group $RESOURCE_GROUP \
--vnet-name "vnet-etl" \
--subnet "subnet-endpoints" \
--private-connection-resource-id $LANDING_STORAGE_ID \
--group-id blob \
--connection-name "storage-connection"
Data Protection¶
- Enable soft delete on storage accounts
- Use versioning for blob storage
- Implement lifecycle policies for data retention
- Enable encryption with customer-managed keys
📚 Next Steps¶
- Add Logic Apps for complex orchestration
- Implement data quality validation
- Build data lineage tracking
Last Updated: 2025-01-28 Complexity: Intermediate Duration: 50 minutes