⚙️ Basic Pipeline Activities¶
Master fundamental pipeline activities including Copy, Lookup, Get Metadata, and Delete activities for building robust data workflows.
📋 Table of Contents¶
- Activity Types Overview
- Copy Activity
- Lookup Activity
- Get Metadata Activity
- Delete Activity
- ForEach Activity
- Hands-On Examples
- Next Steps
📊 Activity Types Overview¶
| Activity Type | Purpose | Common Use Cases |
|---|---|---|
| Copy | Move data between stores | ETL/ELT, data migration |
| Lookup | Retrieve configuration | Dynamic pipelines, metadata-driven |
| Get Metadata | Get file/folder info | File validation, conditional logic |
| Delete | Remove files/data | Cleanup, archival |
| ForEach | Iterate collections | Process multiple files/tables |
| If Condition | Conditional branching | Error handling, validation |
| Wait | Add delays | Rate limiting, scheduling |
📥 Copy Activity¶
The most fundamental activity for data movement.
Basic Copy Activity¶
{
"name": "CopyBlobToSQL",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceBlobDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "SinkSqlDataset",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true
},
"formatSettings": {
"type": "DelimitedTextReadSettings",
"skipLineCount": 0
}
},
"sink": {
"type": "AzureSqlSink",
"writeBehavior": "insert",
"sqlWriterUseTableLock": false,
"tableOption": "autoCreate"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
}
}
Copy Activity with Column Mapping¶
{
"typeProperties": {
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {"name": "CustomerID"},
"sink": {"name": "customer_id"}
},
{
"source": {"name": "CustomerName"},
"sink": {"name": "customer_name"}
},
{
"source": {"name": "OrderDate"},
"sink": {"name": "order_date"}
}
]
}
}
}
Copy with Performance Optimization¶
{
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM Sales WHERE OrderDate >= '@{pipeline().parameters.StartDate}'",
"queryTimeout": "02:00:00",
"partitionOption": "PhysicalPartitionsOfTable"
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings",
"maxConcurrentConnections": 5,
"copyBehavior": "PreserveHierarchy"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage_Staging",
"type": "LinkedServiceReference"
},
"path": "staging"
},
"parallelCopies": 32,
"dataIntegrationUnits": 16
}
}
🔍 Lookup Activity¶
Retrieve metadata or small datasets to drive pipeline logic.
Single Row Lookup¶
{
"name": "LookupMaxDate",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT MAX(ModifiedDate) as MaxDate FROM Sales",
"queryTimeout": "02:00:00"
},
"dataset": {
"referenceName": "AzureSqlTable_Dataset",
"type": "DatasetReference"
},
"firstRowOnly": true
}
}
Multiple Rows Lookup¶
{
"name": "LookupTableList",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT TableName, SchemaName FROM ConfigTables WHERE IsActive = 1"
},
"dataset": {
"referenceName": "ConfigDatabase_Dataset",
"type": "DatasetReference"
},
"firstRowOnly": false
}
}
Using Lookup Results¶
{
"name": "CopyUsingLookup",
"type": "Copy",
"dependsOn": [
{
"activity": "LookupMaxDate",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM Sales WHERE ModifiedDate > '@{activity('LookupMaxDate').output.firstRow.MaxDate}'"
}
}
}
📁 Get Metadata Activity¶
Retrieve properties of files or folders.
Get File Properties¶
{
"name": "GetFileMetadata",
"type": "GetMetadata",
"typeProperties": {
"dataset": {
"referenceName": "BlobFile_Dataset",
"type": "DatasetReference",
"parameters": {
"containerName": "input",
"fileName": "data.csv"
}
},
"fieldList": [
"exists",
"itemName",
"lastModified",
"size",
"structure"
],
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
}
}
}
Get Folder Contents¶
{
"name": "GetFolderMetadata",
"type": "GetMetadata",
"typeProperties": {
"dataset": {
"referenceName": "BlobFolder_Dataset",
"type": "DatasetReference",
"parameters": {
"containerName": "input",
"folderPath": "daily"
}
},
"fieldList": [
"childItems",
"exists",
"itemName"
]
}
}
Conditional Logic Based on Metadata¶
{
"name": "CheckFileExists",
"type": "IfCondition",
"dependsOn": [
{
"activity": "GetFileMetadata",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"expression": {
"value": "@activity('GetFileMetadata').output.exists",
"type": "Expression"
},
"ifTrueActivities": [
{
"name": "CopyFile",
"type": "Copy"
}
],
"ifFalseActivities": [
{
"name": "LogMissingFile",
"type": "WebActivity"
}
]
}
}
🗑️ Delete Activity¶
Remove files or data after processing.
Delete File¶
{
"name": "DeleteProcessedFile",
"type": "Delete",
"typeProperties": {
"dataset": {
"referenceName": "BlobFile_Dataset",
"type": "DatasetReference",
"parameters": {
"containerName": "processed",
"fileName": "@{pipeline().parameters.FileName}"
}
},
"enableLogging": true,
"logStorageSettings": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage_Logging",
"type": "LinkedServiceReference"
},
"path": "deletion-logs"
}
}
}
Delete Folder¶
{
"name": "DeleteOldFiles",
"type": "Delete",
"typeProperties": {
"dataset": {
"referenceName": "BlobFolder_Dataset",
"type": "DatasetReference",
"parameters": {
"folderPath": "archive/2023"
}
},
"recursive": true,
"maxConcurrentConnections": 10
}
}
🔁 ForEach Activity¶
Iterate over collections to process multiple items.
Basic ForEach¶
{
"name": "ProcessFiles",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetFolderMetadata",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('GetFolderMetadata').output.childItems",
"type": "Expression"
},
"isSequential": false,
"batchCount": 20,
"activities": [
{
"name": "CopyEachFile",
"type": "Copy",
"inputs": [
{
"referenceName": "SourceFile_Dataset",
"type": "DatasetReference",
"parameters": {
"fileName": "@item().name"
}
}
],
"outputs": [
{
"referenceName": "DestinationFile_Dataset",
"type": "DatasetReference",
"parameters": {
"fileName": "@item().name"
}
}
]
}
]
}
}
ForEach with Table List¶
{
"name": "CopyMultipleTables",
"type": "ForEach",
"dependsOn": [
{
"activity": "LookupTableList",
"dependencyConditions": ["Succeeded"]
}
],
"typeProperties": {
"items": {
"value": "@activity('LookupTableList').output.value",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "CopyTable",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM @{item().SchemaName}.@{item().TableName}"
}
}
}
]
}
}
🎯 Hands-On Examples¶
Example 1: File Processing Pipeline¶
Complete pipeline that processes files with validation.
{
"name": "FileProcessingPipeline",
"properties": {
"activities": [
{
"name": "GetFileList",
"type": "GetMetadata",
"typeProperties": {
"dataset": {"referenceName": "InputFolder"},
"fieldList": ["childItems"]
}
},
{
"name": "ProcessEachFile",
"type": "ForEach",
"dependsOn": [{"activity": "GetFileList", "dependencyConditions": ["Succeeded"]}],
"typeProperties": {
"items": "@activity('GetFileList').output.childItems",
"activities": [
{
"name": "CopyFile",
"type": "Copy"
},
{
"name": "DeleteSourceFile",
"type": "Delete"
}
]
}
}
]
}
}
Example 2: Metadata-Driven Copy¶
{
"name": "MetadataDrivenCopy",
"properties": {
"parameters": {
"ConfigTableName": {"type": "string"}
},
"activities": [
{
"name": "GetCopyConfig",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM @{pipeline().parameters.ConfigTableName}"
},
"firstRowOnly": false
}
},
{
"name": "CopyEachTable",
"type": "ForEach",
"dependsOn": [{"activity": "GetCopyConfig"}],
"typeProperties": {
"items": "@activity('GetCopyConfig').output.value",
"activities": [
{
"name": "CopyData",
"type": "Copy",
"typeProperties": {
"source": {
"type": "@{item().SourceType}",
"query": "@{item().SourceQuery}"
},
"sink": {
"type": "@{item().SinkType}"
}
}
}
]
}
}
]
}
}
✅ Best Practices¶
Copy Activity¶
- Enable Staging for Large Data Transfers
- Improves reliability
-
Better performance for cross-region copies
-
Use Partitioning
- Parallel processing
-
Faster data movement
-
Configure Appropriate DIU
- Start with 4-8 DIUs
- Scale based on data volume
Lookup Activity¶
- Minimize Data Retrieved
- Use specific SELECT statements
-
Only retrieve necessary columns
-
Use First Row Only When Possible
- Better performance
- Lower memory usage
ForEach Activity¶
- Use Batch Processing
- Set appropriate batchCount
-
Balance parallelism and resource usage
-
Consider Sequential vs Parallel
- Sequential for ordered processing
- Parallel for independent items
📚 Additional Resources¶
🚀 Next Steps¶
Basic activities mastered! Proceed to:
→ 08. Advanced Orchestration - Complex pipeline patterns
Module Progress: 7 of 18 complete
Tutorial Version: 1.0 Last Updated: January 2025