Skip to content

📥 Tutorial 4: Batch Data Ingestion

Tutorial Duration Level

Build robust batch data ingestion pipelines using Azure Synapse Pipelines and Azure Data Factory. Learn to ingest multiple data formats, handle schema changes, and implement error handling.

🎯 Learning Objectives

After completing this tutorial, you will be able to:

  • Create copy activities for ingesting data from various sources
  • Handle multiple data formats (CSV, JSON, Parquet, Avro)
  • Implement schema mapping and data type conversions
  • Configure error handling and data validation
  • Schedule and monitor pipeline executions

⏱️ Time Estimate: 40 minutes

  • Pipeline Creation: 15 minutes
  • Data Format Handling: 15 minutes
  • Error Handling & Testing: 10 minutes

📋 Prerequisites

Completed Tutorials

Required Resources

  • Synapse workspace configured
  • ADLS Gen2 storage with landing/raw/curated zones
  • Sample datasets prepared

Verify Prerequisites

# Load workspace configuration
$config = Get-Content "workspace-config.json" | ConvertFrom-Json

# Verify storage containers exist
az storage fs list --account-name $config.StorageAccount --auth-mode login --output table

📊 Step 1: Prepare Sample Datasets

1.1 Create Sample CSV Data

# Create customer data CSV
$customerData = @"
CustomerID,FirstName,LastName,Email,Country,RegistrationDate
C001,John,Smith,john.smith@example.com,USA,2024-01-15
C002,Maria,Garcia,maria.garcia@example.com,Spain,2024-02-20
C003,Yuki,Tanaka,yuki.tanaka@example.com,Japan,2024-03-10
C004,Ahmed,Hassan,ahmed.hassan@example.com,Egypt,2024-03-15
C005,Emma,Johnson,emma.johnson@example.com,UK,2024-04-01
"@

$customerData | Out-File "customers.csv" -Encoding UTF8

# Upload to landing zone
az storage blob upload `
  --account-name $config.StorageAccount `
  --container-name "landing" `
  --name "customers/2024/customers.csv" `
  --file "customers.csv" `
  --auth-mode login

Write-Host "✅ Customer CSV uploaded to landing zone" -ForegroundColor Green

1.2 Create Sample JSON Data

# Create transaction data JSON
$transactions = @(
    @{TransactionID="T001"; CustomerID="C001"; Amount=125.50; ProductCategory="Electronics"; TransactionDate="2024-05-01T10:30:00Z"}
    @{TransactionID="T002"; CustomerID="C002"; Amount=45.99; ProductCategory="Books"; TransactionDate="2024-05-01T11:15:00Z"}
    @{TransactionID="T003"; CustomerID="C001"; Amount=299.99; ProductCategory="Clothing"; TransactionDate="2024-05-02T09:45:00Z"}
    @{TransactionID="T004"; CustomerID="C003"; Amount=89.00; ProductCategory="Electronics"; TransactionDate="2024-05-02T14:20:00Z"}
    @{TransactionID="T005"; CustomerID="C004"; Amount=12.50; ProductCategory="Books"; TransactionDate="2024-05-03T16:00:00Z"}
) | ConvertTo-Json

$transactions | Out-File "transactions.json" -Encoding UTF8

# Upload to landing zone
az storage blob upload `
  --account-name $config.StorageAccount `
  --container-name "landing" `
  --name "transactions/2024-05/transactions.json" `
  --file "transactions.json" `
  --auth-mode login

Write-Host "✅ Transaction JSON uploaded to landing zone" -ForegroundColor Green

1.3 Create Sample Parquet Data

# Create sample_parquet_generator.py
import pandas as pd
from datetime import datetime, timedelta

# Generate product catalog data
products = {
    'ProductID': [f'P{str(i).zfill(4)}' for i in range(1, 101)],
    'ProductName': [f'Product {i}' for i in range(1, 101)],
    'Category': ['Electronics', 'Books', 'Clothing', 'Home', 'Sports'] * 20,
    'Price': [round(10 + (i * 5.5), 2) for i in range(1, 101)],
    'StockQuantity': [50 + (i * 10) for i in range(1, 101)],
    'LastUpdated': [datetime.now() - timedelta(days=i) for i in range(100)]
}

df = pd.DataFrame(products)
df.to_parquet('product_catalog.parquet', index=False, engine='pyarrow')
print("✅ Parquet file created: product_catalog.parquet")
# Run Python script to generate Parquet file
python sample_parquet_generator.py

# Upload to landing zone
az storage blob upload `
  --account-name $config.StorageAccount `
  --container-name "landing" `
  --name "products/product_catalog.parquet" `
  --file "product_catalog.parquet" `
  --auth-mode login

Write-Host "✅ Product Parquet uploaded to landing zone" -ForegroundColor Green

🔧 Step 2: Create Datasets in Synapse

2.1 Create Source Dataset for CSV

Via Synapse Studio:

1. Navigate to Data → Linked → Click on storage account
2. Right-click "landing" container → New dataset
3. Select format: "Delimited Text"
4. Configuration:
   - Name: "ds_customers_csv"
   - Linked service: WorkspaceStorage
   - File path: landing/customers/2024/
   - First row as header: Yes
   - Import schema: From connection/store
5. Click "OK" → Publish

Via Azure CLI (JSON Definition):

# Create dataset JSON
$csvDataset = @"
{
  "name": "ds_customers_csv",
  "properties": {
    "linkedServiceName": {
      "referenceName": "WorkspaceStorage",
      "type": "LinkedServiceReference"
    },
    "type": "DelimitedText",
    "typeProperties": {
      "location": {
        "type": "AzureBlobFSLocation",
        "fileName": "customers.csv",
        "folderPath": "customers/2024",
        "fileSystem": "landing"
      },
      "columnDelimiter": ",",
      "escapeChar": "\\",
      "firstRowAsHeader": true,
      "quoteChar": "\""
    },
    "schema": [
      {"name": "CustomerID", "type": "String"},
      {"name": "FirstName", "type": "String"},
      {"name": "LastName", "type": "String"},
      {"name": "Email", "type": "String"},
      {"name": "Country", "type": "String"},
      {"name": "RegistrationDate", "type": "String"}
    ]
  }
}
"@

$csvDataset | Out-File "ds_customers_csv.json"

az synapse dataset create `
  --workspace-name $config.WorkspaceName `
  --name "ds_customers_csv" `
  --file "ds_customers_csv.json"

Write-Host "✅ CSV dataset created" -ForegroundColor Green

2.2 Create Sink Dataset for Parquet

# Create sink dataset for raw zone
$sinkDataset = @"
{
  "name": "ds_customers_parquet",
  "properties": {
    "linkedServiceName": {
      "referenceName": "WorkspaceStorage",
      "type": "LinkedServiceReference"
    },
    "type": "Parquet",
    "typeProperties": {
      "location": {
        "type": "AzureBlobFSLocation",
        "folderPath": "customers",
        "fileSystem": "raw"
      },
      "compressionCodec": "snappy"
    }
  }
}
"@

$sinkDataset | Out-File "ds_customers_parquet.json"

az synapse dataset create `
  --workspace-name $config.WorkspaceName `
  --name "ds_customers_parquet" `
  --file "ds_customers_parquet.json"

Write-Host "✅ Parquet sink dataset created" -ForegroundColor Green

2.3 Create JSON Dataset

# Create JSON source dataset
$jsonDataset = @"
{
  "name": "ds_transactions_json",
  "properties": {
    "linkedServiceName": {
      "referenceName": "WorkspaceStorage",
      "type": "LinkedServiceReference"
    },
    "type": "Json",
    "typeProperties": {
      "location": {
        "type": "AzureBlobFSLocation",
        "fileName": "transactions.json",
        "folderPath": "transactions/2024-05",
        "fileSystem": "landing"
      }
    },
    "schema": {}
  }
}
"@

$jsonDataset | Out-File "ds_transactions_json.json"

az synapse dataset create `
  --workspace-name $config.WorkspaceName `
  --name "ds_transactions_json" `
  --file "ds_transactions_json.json"

Write-Host "✅ JSON dataset created" -ForegroundColor Green

🔄 Step 3: Build Copy Pipeline

3.1 Create Basic Copy Pipeline

Via Synapse Studio:

1. Navigate to Integrate → + New → Pipeline
2. Name: "pl_ingest_customers"
3. From Activities → Move & transform → Drag "Copy data" to canvas
4. Configure Copy activity:
   - General tab:
     - Name: "CopyCustomersCSVToParquet"
   - Source tab:
     - Source dataset: ds_customers_csv
   - Sink tab:
     - Sink dataset: ds_customers_parquet
     - Copy method: Merge files
   - Mapping tab:
     - Import schemas
     - Review column mappings
5. Click "Validate" → Fix any errors
6. Click "Publish all"

Via Azure CLI (JSON Definition):

# Create pipeline JSON
$pipeline = @"
{
  "name": "pl_ingest_customers",
  "properties": {
    "activities": [
      {
        "name": "CopyCustomersCSVToParquet",
        "type": "Copy",
        "inputs": [
          {
            "referenceName": "ds_customers_csv",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "ds_customers_parquet",
            "type": "DatasetReference"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "DelimitedTextSource",
            "storeSettings": {
              "type": "AzureBlobFSReadSettings",
              "recursive": true
            }
          },
          "sink": {
            "type": "ParquetSink",
            "storeSettings": {
              "type": "AzureBlobFSWriteSettings",
              "copyBehavior": "PreserveHierarchy"
            }
          },
          "enableStaging": false,
          "translator": {
            "type": "TabularTranslator",
            "mappings": [
              {"source": {"name": "CustomerID"}, "sink": {"name": "CustomerID", "type": "String"}},
              {"source": {"name": "FirstName"}, "sink": {"name": "FirstName", "type": "String"}},
              {"source": {"name": "LastName"}, "sink": {"name": "LastName", "type": "String"}},
              {"source": {"name": "Email"}, "sink": {"name": "Email", "type": "String"}},
              {"source": {"name": "Country"}, "sink": {"name": "Country", "type": "String"}},
              {"source": {"name": "RegistrationDate"}, "sink": {"name": "RegistrationDate", "type": "DateTime"}}
            ]
          }
        }
      }
    ]
  }
}
"@

$pipeline | Out-File "pl_ingest_customers.json"

az synapse pipeline create `
  --workspace-name $config.WorkspaceName `
  --name "pl_ingest_customers" `
  --file "pl_ingest_customers.json"

Write-Host "✅ Copy pipeline created" -ForegroundColor Green

3.2 Add Data Validation Activity

# Enhanced pipeline with validation
$pipelineWithValidation = @"
{
  "name": "pl_ingest_customers_validated",
  "properties": {
    "activities": [
      {
        "name": "ValidateSourceExists",
        "type": "Validation",
        "typeProperties": {
          "dataset": {
            "referenceName": "ds_customers_csv",
            "type": "DatasetReference"
          },
          "timeout": "00:05:00",
          "sleep": 10,
          "minimumSize": 100
        }
      },
      {
        "name": "CopyCustomersCSVToParquet",
        "type": "Copy",
        "dependsOn": [
          {
            "activity": "ValidateSourceExists",
            "dependencyConditions": ["Succeeded"]
          }
        ],
        "inputs": [{"referenceName": "ds_customers_csv", "type": "DatasetReference"}],
        "outputs": [{"referenceName": "ds_customers_parquet", "type": "DatasetReference"}],
        "typeProperties": {
          "source": {"type": "DelimitedTextSource"},
          "sink": {"type": "ParquetSink"},
          "enableStaging": false,
          "dataIntegrationUnits": 4
        }
      }
    ]
  }
}
"@

$pipelineWithValidation | Out-File "pl_ingest_customers_validated.json"

az synapse pipeline create `
  --workspace-name $config.WorkspaceName `
  --name "pl_ingest_customers_validated" `
  --file "pl_ingest_customers_validated.json"

Write-Host "✅ Validated pipeline created" -ForegroundColor Green

3.3 Configure Error Handling

Error Handling Configuration in Copy Activity:

1. Source tab → Error tolerance settings:
   - Fault tolerance: Skip incompatible rows
   - Max errors allowed: 100
   - Log incompatible rows: Yes
   - Log storage: landing/errors/

2. Settings tab:
   - Data consistency verification: Yes
   - Enable logging: Yes
   - Log level: Warning

3. Sink tab → Pre-copy script:
   - Run cleanup query before copy (if applicable)

📝 Step 4: Handle Multiple Formats

4.1 Create Multi-Format Ingestion Pipeline

# Pipeline for ingesting all data formats
$multiFormatPipeline = @"
{
  "name": "pl_ingest_all_sources",
  "properties": {
    "activities": [
      {
        "name": "IngestCustomersCSV",
        "type": "Copy",
        "inputs": [{"referenceName": "ds_customers_csv", "type": "DatasetReference"}],
        "outputs": [{"referenceName": "ds_customers_parquet", "type": "DatasetReference"}],
        "typeProperties": {
          "source": {"type": "DelimitedTextSource"},
          "sink": {"type": "ParquetSink"}
        }
      },
      {
        "name": "IngestTransactionsJSON",
        "type": "Copy",
        "dependsOn": [],
        "inputs": [{"referenceName": "ds_transactions_json", "type": "DatasetReference"}],
        "outputs": [{"referenceName": "ds_transactions_parquet", "type": "DatasetReference"}],
        "typeProperties": {
          "source": {"type": "JsonSource"},
          "sink": {"type": "ParquetSink"}
        }
      },
      {
        "name": "CopyProductParquet",
        "type": "Copy",
        "dependsOn": [],
        "inputs": [{"referenceName": "ds_products_parquet_landing", "type": "DatasetReference"}],
        "outputs": [{"referenceName": "ds_products_parquet_raw", "type": "DatasetReference"}],
        "typeProperties": {
          "source": {"type": "ParquetSource"},
          "sink": {"type": "ParquetSink"}
        }
      }
    ]
  }
}
"@

$multiFormatPipeline | Out-File "pl_ingest_all_sources.json"
Write-Host "✅ Multi-format ingestion pipeline defined" -ForegroundColor Green

4.2 Schema Mapping for Different Formats

CSV to Parquet with Type Conversion:

{
  "translator": {
    "type": "TabularTranslator",
    "mappings": [
      {
        "source": {"name": "CustomerID", "type": "String"},
        "sink": {"name": "customer_id", "type": "String"}
      },
      {
        "source": {"name": "RegistrationDate", "type": "String"},
        "sink": {"name": "registration_date", "type": "DateTime"}
      }
    ],
    "typeConversion": true,
    "typeConversionSettings": {
      "allowDataTruncation": false,
      "treatBooleanAsNumber": false,
      "dateTimeFormat": "yyyy-MM-dd"
    }
  }
}

JSON to Parquet with Flattening:

{
  "translator": {
    "type": "TabularTranslator",
    "mappings": [
      {"source": {"path": "$.TransactionID"}, "sink": {"name": "transaction_id"}},
      {"source": {"path": "$.CustomerID"}, "sink": {"name": "customer_id"}},
      {"source": {"path": "$.Amount"}, "sink": {"name": "amount", "type": "Decimal"}},
      {"source": {"path": "$.ProductCategory"}, "sink": {"name": "category"}},
      {"source": {"path": "$.TransactionDate"}, "sink": {"name": "transaction_date", "type": "DateTime"}}
    ]
  }
}

🚀 Step 5: Execute and Monitor Pipeline

5.1 Trigger Pipeline Manually

# Run pipeline manually
az synapse pipeline create-run `
  --workspace-name $config.WorkspaceName `
  --name "pl_ingest_customers" `
  --output json

# Get run ID from output
$runId = "paste-run-id-here"

# Monitor pipeline run
az synapse pipeline-run show `
  --workspace-name $config.WorkspaceName `
  --run-id $runId `
  --output table

Write-Host "✅ Pipeline triggered successfully" -ForegroundColor Green

5.2 Monitor in Synapse Studio

Via UI:

1. Navigate to Monitor → Pipeline runs
2. Find your pipeline: "pl_ingest_customers"
3. Click on run to view details:
   - Activity runs
   - Input/output data
   - Duration and status
   - Error messages (if any)
4. Click on activity name → View details:
   - Rows read/written
   - Data volume
   - Throughput
   - DIU (Data Integration Units) used

5.3 Query Ingested Data

-- Query ingested Parquet data using Serverless SQL
-- Execute in Synapse Studio → Develop → New SQL script

-- Query customer data in raw zone
SELECT TOP 10
    CustomerID,
    FirstName,
    LastName,
    Email,
    Country,
    RegistrationDate
FROM OPENROWSET(
    BULK 'https://[storage-account].dfs.core.windows.net/raw/customers/*.parquet',
    FORMAT = 'PARQUET'
) AS customers
ORDER BY RegistrationDate DESC;

-- Verify row count
SELECT COUNT(*) as TotalCustomers
FROM OPENROWSET(
    BULK 'https://[storage-account].dfs.core.windows.net/raw/customers/*.parquet',
    FORMAT = 'PARQUET'
) AS customers;

⏰ Step 6: Schedule Pipeline Execution

6.1 Create Schedule Trigger

# Create daily trigger at 2 AM
$trigger = @"
{
  "name": "tr_daily_ingest",
  "properties": {
    "type": "ScheduleTrigger",
    "typeProperties": {
      "recurrence": {
        "frequency": "Day",
        "interval": 1,
        "startTime": "2024-01-01T02:00:00Z",
        "timeZone": "UTC",
        "schedule": {}
      }
    },
    "pipelines": [
      {
        "pipelineReference": {
          "referenceName": "pl_ingest_all_sources",
          "type": "PipelineReference"
        },
        "parameters": {}
      }
    ]
  }
}
"@

$trigger | Out-File "tr_daily_ingest.json"

az synapse trigger create `
  --workspace-name $config.WorkspaceName `
  --name "tr_daily_ingest" `
  --file "tr_daily_ingest.json"

# Start trigger
az synapse trigger start `
  --workspace-name $config.WorkspaceName `
  --name "tr_daily_ingest"

Write-Host "✅ Schedule trigger created and started" -ForegroundColor Green

6.2 Create Tumbling Window Trigger

# Trigger for incremental loads every 6 hours
$tumblingTrigger = @"
{
  "name": "tr_tumbling_6h",
  "properties": {
    "type": "TumblingWindowTrigger",
    "typeProperties": {
      "frequency": "Hour",
      "interval": 6,
      "startTime": "2024-01-01T00:00:00Z",
      "delay": "00:00:00",
      "maxConcurrency": 1,
      "retryPolicy": {
        "count": 3,
        "intervalInSeconds": 300
      }
    },
    "pipeline": {
      "pipelineReference": {
        "referenceName": "pl_ingest_customers",
        "type": "PipelineReference"
      },
      "parameters": {
        "windowStart": "@trigger().outputs.windowStartTime",
        "windowEnd": "@trigger().outputs.windowEndTime"
      }
    }
  }
}
"@

$tumblingTrigger | Out-File "tr_tumbling_6h.json"
Write-Host "✅ Tumbling window trigger configured" -ForegroundColor Green

🎯 Step 7: Implement Advanced Patterns

7.1 Parameterized Pipeline for Reusability

# Generic copy pipeline with parameters
$parameterizedPipeline = @"
{
  "name": "pl_generic_copy",
  "properties": {
    "parameters": {
      "sourceContainer": {"type": "String"},
      "sourcePath": {"type": "String"},
      "sinkContainer": {"type": "String"},
      "sinkPath": {"type": "String"},
      "fileFormat": {"type": "String", "defaultValue": "parquet"}
    },
    "activities": [
      {
        "name": "DynamicCopy",
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BinarySource",
            "storeSettings": {
              "type": "AzureBlobFSReadSettings",
              "recursive": true
            }
          },
          "sink": {
            "type": "BinarySink",
            "storeSettings": {
              "type": "AzureBlobFSWriteSettings"
            }
          }
        }
      }
    ]
  }
}
"@

$parameterizedPipeline | Out-File "pl_generic_copy.json"
Write-Host "✅ Parameterized pipeline created for reusability" -ForegroundColor Green

7.2 Error Logging Pipeline

-- Create error log table in Serverless SQL
CREATE EXTERNAL TABLE bronze.ingestion_errors (
    ErrorID UNIQUEIDENTIFIER,
    PipelineName NVARCHAR(200),
    ActivityName NVARCHAR(200),
    SourceFile NVARCHAR(500),
    ErrorMessage NVARCHAR(MAX),
    ErrorTimestamp DATETIME2,
    RowData NVARCHAR(MAX)
)
WITH (
    LOCATION = 'ingestion-errors/',
    DATA_SOURCE = WorkspaceStorage,
    FILE_FORMAT = ParquetFormat
);

7.3 Metadata-Driven Ingestion

# Create control table for metadata-driven pipelines
$controlTableScript = @"
CREATE TABLE control.ingestion_metadata (
    SourceID INT IDENTITY(1,1),
    SourceName NVARCHAR(100),
    SourceType NVARCHAR(50),
    SourcePath NVARCHAR(500),
    TargetPath NVARCHAR(500),
    IsActive BIT,
    LastProcessedDate DATETIME2,
    ProcessingFrequency NVARCHAR(20)
);

INSERT INTO control.ingestion_metadata VALUES
('Customers', 'CSV', 'landing/customers/', 'raw/customers/', 1, NULL, 'Daily'),
('Transactions', 'JSON', 'landing/transactions/', 'raw/transactions/', 1, NULL, 'Hourly'),
('Products', 'Parquet', 'landing/products/', 'raw/products/', 1, NULL, 'Weekly');
"@

Write-Host "✅ Metadata-driven pattern configured" -ForegroundColor Green

✅ Step 8: Validate and Test

8.1 Comprehensive Pipeline Testing

# Pipeline validation script
Write-Host "🔍 Validating Batch Ingestion Pipelines..." -ForegroundColor Cyan

# Test 1: Verify pipelines exist
$pipelines = az synapse pipeline list `
  --workspace-name $config.WorkspaceName `
  --query "[].name" `
  --output tsv

$expectedPipelines = @("pl_ingest_customers", "pl_ingest_all_sources")
foreach ($pipeline in $expectedPipelines) {
    if ($pipelines -contains $pipeline) {
        Write-Host "✅ Pipeline exists: $pipeline" -ForegroundColor Green
    } else {
        Write-Host "❌ Pipeline missing: $pipeline" -ForegroundColor Red
    }
}

# Test 2: Verify datasets exist
$datasets = az synapse dataset list `
  --workspace-name $config.WorkspaceName `
  --query "[].name" `
  --output tsv

Write-Host "`n✅ Datasets configured: $($datasets.Count)" -ForegroundColor Green

# Test 3: Check data in raw zone
$rawFiles = az storage blob list `
  --account-name $config.StorageAccount `
  --container-name "raw" `
  --auth-mode login `
  --query "[].name" `
  --output tsv

if ($rawFiles) {
    Write-Host "✅ Data files in raw zone: $($rawFiles.Count)" -ForegroundColor Green
} else {
    Write-Host "⚠️ No data files found in raw zone" -ForegroundColor Yellow
}

Write-Host "`n🎯 Validation complete!" -ForegroundColor Cyan

8.2 Performance Benchmarking

-- Query to analyze pipeline performance
SELECT
    PipelineName,
    ActivityName,
    AVG(DurationInMs) as AvgDurationMs,
    AVG(RowsRead) as AvgRowsRead,
    AVG(RowsWritten) as AvgRowsWritten,
    AVG(DataRead / 1024 / 1024) as AvgDataReadMB,
    COUNT(*) as RunCount
FROM monitoring.pipeline_activity_runs
WHERE RunDate >= DATEADD(day, -7, GETDATE())
GROUP BY PipelineName, ActivityName
ORDER BY AvgDurationMs DESC;

💡 Key Concepts Review

Data Formats Comparison

Format Best For Compression Schema Evolution Query Performance
CSV Simple data, human-readable Low Manual Slow
JSON Nested/hierarchical data Medium Flexible Medium
Parquet Analytics workloads High Schema-on-read Fast
Avro Streaming, schema evolution Medium Built-in Medium

Pipeline Best Practices

  • ✅ Use parameterization for reusable pipelines
  • ✅ Implement error handling and logging
  • ✅ Enable data validation before copy
  • ✅ Use appropriate Data Integration Units (DIU)
  • ✅ Schedule during off-peak hours
  • ✅ Monitor and optimize copy performance

Error Handling Strategies

  1. Validation Activity: Check file existence and size
  2. Fault Tolerance: Skip incompatible rows
  3. Error Logging: Store failed rows for review
  4. Retry Logic: Configure retry policy for transient failures
  5. Alerting: Set up email notifications for failures

🎉 Congratulations

You've successfully built batch data ingestion pipelines. Your solution now includes:

  • Multi-format data ingestion (CSV, JSON, Parquet)
  • Schema mapping and validation
  • Error handling and logging
  • Scheduled pipeline execution
  • Monitoring and performance tracking

🚀 What's Next?

Continue to Tutorial 5: Real-time Data Streaming

In the next tutorial, you'll:

  • Set up Azure Event Hubs for streaming data
  • Build real-time ingestion pipelines
  • Implement stream processing with Spark Structured Streaming
  • Combine batch and streaming workloads

💬 Troubleshooting

Common Issues and Solutions

Issue: Copy activity fails with "Access Denied"

# Verify managed identity has storage permissions
$workspaceId = az synapse workspace show `
  --name $config.WorkspaceName `
  --resource-group $config.ResourceGroup `
  --query identity.principalId `
  --output tsv

az role assignment create `
  --role "Storage Blob Data Contributor" `
  --assignee $workspaceId `
  --scope "/subscriptions/$(az account show --query id --output tsv)/resourceGroups/$($config.ResourceGroup)/providers/Microsoft.Storage/storageAccounts/$($config.StorageAccount)"

Issue: Schema mismatch errors

Solution:
1. Enable "Skip incompatible rows" in copy activity
2. Review column mappings in Mapping tab
3. Use explicit type conversions in translator configuration

Issue: Pipeline runs slowly

# Increase Data Integration Units (DIU) in copy activity
# Default: 4, Maximum: 256
# Update in pipeline JSON: "dataIntegrationUnits": 16

Tutorial Progress: 4 of 14 completed Next: 05. Real-time Streaming → Time Investment: 40 minutes ✅

Reliable batch ingestion is the foundation of data lakes. Master these patterns before moving to streaming.