Interactive Tutorial: End-to-End Data Pipeline in Synapse Analytics¶
Home > Tutorials > Interactive Data Pipeline
This interactive tutorial guides you through building a complete data pipeline in Azure Synapse Analytics, from data ingestion to transformation and visualization. Follow along with notebook examples, pipeline templates, and step-by-step instructions.
Introduction¶
In this tutorial, you'll learn how to:
- Set up a data source connection
- Ingest data using Synapse pipelines
- Transform data using Spark pools
- Create a Delta Lake table
- Query data using Serverless SQL
- Visualize results with Power BI integration
The entire tutorial is designed to be completed in approximately 2-3 hours, depending on your familiarity with Azure Synapse Analytics.
Prerequisites¶
Before you begin, ensure you have:
- An Azure subscription with permissions to create resources
- A Synapse Analytics workspace with:
- Spark pool (Small or Medium size)
- Serverless SQL pool
- Storage account with ADLS Gen2
- Sample data files (provided in the tutorial)
- Power BI Desktop (optional for visualization section)
Setup: Create Resources and Sample Data¶
Step 1: Download the Tutorial Files¶
- Download the tutorial files from our GitHub repository:
git clone https://github.com/microsoft/synapse-tutorials.git
cd synapse-tutorials/end-to-end-pipeline
- Upload the sample data to your storage account using the Azure Storage Explorer or the following PowerShell script:
$storageAccountName = "your-storage-account-name"
$containerName = "tutorial"
$localFolderPath = "./sample-data"
# Create context
$ctx = New-AzStorageContext -StorageAccountName $storageAccountName -UseConnectedAccount
# Create container if it doesn't exist
New-AzStorageContainer -Name $containerName -Context $ctx -ErrorAction SilentlyContinue
# Upload files
$files = Get-ChildItem -Path $localFolderPath -File
foreach ($file in $files) {
Set-AzStorageBlobContent -File $file.FullName -Container $containerName -Blob "raw/sales/$($file.Name)" -Context $ctx
}
Write-Output "Sample data uploaded successfully to $storageAccountName/$containerName/raw/sales/"
Step 2: Set Up Linked Service for Sample Data¶
- In Synapse Studio, navigate to Manage > Linked services
- Click + New to create a new linked service
- Select Azure Data Lake Storage Gen2 and click Continue
- Configure the linked service:
- Name:
TutorialDataStorage - Authentication method: Select appropriate method (Managed Identity recommended)
- Account selection method: From Azure subscription
- Azure subscription: Select your subscription
- Storage account name: Select your storage account
- Test connection: Verify connection succeeds
- Click Create
Part 1: Data Ingestion with Synapse Pipeline¶
Step 1: Create a Pipeline¶
- In Synapse Studio, navigate to Integrate
- Click + > Pipeline
- Name your pipeline
SalesPipeline
Step 2: Add Copy Data Activity¶
- In the Activities pane, expand Move & Transform and drag a Copy data activity to the pipeline canvas
- Select the Copy data activity and configure:
- Source tab:
- Source dataset: Click + New
- Select Azure Data Lake Storage Gen2 > DelimitedText
- Name:
SalesRawData - Linked service: Select
TutorialDataStorage - File path: Browse to
/tutorial/raw/sales/ - First row as header: Checked
- Import schema: From connection/store
- Sink tab:
- Sink dataset: Click + New
- Select Azure Data Lake Storage Gen2 > DelimitedText
- Name:
SalesStaging - Linked service: Select
TutorialDataStorage - File path: Type
/tutorial/staging/sales/ - First row as header: Checked
- In the Mapping tab, click Import schemas and verify column mappings
Step 3: Add Parameters and Trigger Settings¶
- Go to the Parameters tab for your pipeline
- Add a parameter:
- Name:
ProcessDate - Type: String
- Default value:
@utcnow('yyyy-MM-dd') - Configure the Copy activity:
- Select the Copy activity
- Go to Sink > SalesStaging dataset > Parameters
- Set File path to:
/tutorial/staging/sales/@{pipeline().parameters.ProcessDate}/
Step 4: Run the Pipeline¶
- Click Debug to run the pipeline
- Monitor the pipeline execution in the Output tab
- Once completed, verify data was copied to the staging folder
Part 2: Data Transformation with Spark¶
Step 1: Create a Spark Notebook¶
- In Synapse Studio, navigate to Develop
- Click + > Notebook
- Name your notebook
SalesTransformation - Connect to your Spark pool
Step 2: Read and Transform the Data¶
Add the following code to your notebook cells:
# Cell 1: Set up parameters and paths
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, hour, minute, sum, avg, count
from pyspark.sql.types import DoubleType, IntegerType
import datetime
# Get current date for folder path
process_date = datetime.datetime.now().strftime("%Y-%m-%d")
staging_path = f"abfss://tutorial@<your-storage-account-name>.dfs.core.windows.net/staging/sales/{process_date}"
curated_path = "abfss://tutorial@<your-storage-account-name>.dfs.core.windows.net/curated/sales"
print(f"Processing data from {staging_path}")
# Cell 2: Read the staging data
df_sales = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(staging_path)
# Display the schema and sample data
df_sales.printSchema()
df_sales.show(5)
# Cell 3: Transform and clean the data
# Convert string columns to appropriate types
df_transformed = df_sales \
.withColumn("SaleAmount", col("SaleAmount").cast(DoubleType())) \
.withColumn("Quantity", col("Quantity").cast(IntegerType())) \
.withColumn("SaleDate", to_date(col("SaleDate"), "yyyy-MM-dd"))
# Add date dimension columns for analysis
df_transformed = df_transformed \
.withColumn("Year", year(col("SaleDate"))) \
.withColumn("Month", month(col("SaleDate"))) \
.withColumn("Day", dayofmonth(col("SaleDate")))
# Show transformed data
df_transformed.show(5)
# Cell 4: Create aggregations for analysis
# Sales by product
df_product_sales = df_transformed \
.groupBy("ProductID", "ProductName", "Year", "Month") \
.agg(
sum("SaleAmount").alias("TotalSales"),
sum("Quantity").alias("TotalQuantity"),
count("*").alias("TransactionCount")
)
# Display results
df_product_sales.show(5)
# Cell 5: Save data as Delta tables
# Save detailed sales data
df_transformed.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(f"{curated_path}/detailed")
# Save aggregated sales data
df_product_sales.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("Year", "Month") \
.save(f"{curated_path}/aggregated")
print("Data successfully transformed and saved as Delta tables")
# Cell 6: Create Spark SQL tables for the data
# Create database if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS sales")
# Create tables pointing to Delta locations
spark.sql(f"""
CREATE TABLE IF NOT EXISTS sales.detailed_sales
USING DELTA
LOCATION '{curated_path}/detailed'
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS sales.product_sales_monthly
USING DELTA
LOCATION '{curated_path}/aggregated'
""")
# Verify tables
spark.sql("SHOW TABLES IN sales").show()
Step 3: Run the Notebook¶
- Replace
<your-storage-account-name>with your actual storage account name - Run each cell in sequence by clicking the ▶ button
- Review the output of each cell to ensure it executes correctly
Part 3: Query Data with Serverless SQL¶
Step 1: Navigate to Serverless SQL¶
- In Synapse Studio, click on Data in the left navigation
- Expand your workspace and select Built-in
- Navigate to Lake database > sales
- You should see the tables created by Spark:
detailed_salesandproduct_sales_monthly
Step 2: Create Views for Analysis¶
Run the following SQL queries:
-- Create a view for sales trends
CREATE OR ALTER VIEW sales.vw_SalesTrends AS
SELECT
Year,
Month,
ProductName,
TotalSales,
TotalQuantity,
TotalSales / TotalQuantity AS AvgPricePerUnit
FROM
sales.product_sales_monthly
WHERE
TotalQuantity > 0;
-- Create a view for product performance ranking
CREATE OR ALTER VIEW sales.vw_ProductPerformance AS
WITH ProductRanking AS (
SELECT
ProductName,
SUM(TotalSales) AS TotalRevenue,
SUM(TotalQuantity) AS TotalUnitsSold,
RANK() OVER(ORDER BY SUM(TotalSales) DESC) AS RevenuRank
FROM
sales.product_sales_monthly
GROUP BY
ProductName
)
SELECT
ProductName,
TotalRevenue,
TotalUnitsSold,
RevenuRank,
CASE
WHEN RevenuRank <= 3 THEN 'Top Performer'
WHEN RevenuRank <= 10 THEN 'Strong Performer'
WHEN RevenuRank <= 20 THEN 'Average Performer'
ELSE 'Under Performer'
END AS PerformanceCategory
FROM
ProductRanking;
Step 3: Run Interactive Queries¶
Now run these analytical queries:
-- Monthly sales trend
SELECT
Year,
Month,
SUM(TotalSales) AS MonthlySales
FROM
sales.product_sales_monthly
GROUP BY
Year, Month
ORDER BY
Year, Month;
-- Top 10 products by revenue
SELECT TOP 10
ProductName,
TotalRevenue,
TotalUnitsSold,
PerformanceCategory
FROM
sales.vw_ProductPerformance
ORDER BY
RevenuRank;
Part 4: Create an End-to-End Orchestrated Pipeline¶
Now let's combine all the steps into a single orchestrated pipeline:
Step 1: Create a Master Pipeline¶
- In Synapse Studio, navigate to Integrate
- Click + > Pipeline
- Name your pipeline
MasterSalesPipeline
Step 2: Add the Copy Data Activity¶
- Drag a Copy data activity from the Move & Transform category
- Configure it exactly as in Part 1, using the same source and sink datasets
Step 3: Add the Notebook Activity¶
- Drag a Notebook activity from the Synapse category
- Connect the Copy activity's output to the Notebook activity input
- Configure the Notebook:
- Notebook: Select
SalesTransformation - Spark pool: Select your Spark pool
- Base parameters: Leave empty (the notebook uses current date)
Step 4: Configure Pipeline Success Email (Optional)¶
- Drag a Web activity from the General category
- Connect the Notebook activity's output to the Web activity input
- Configure for sending an email notification using Logic Apps or other email service
Step 5: Run the Master Pipeline¶
- Click Debug to test the pipeline
- Monitor the execution in the pipeline canvas
- Verify all activities complete successfully
Part 5: Visualize Results with Power BI¶
Step 1: Connect Power BI to Synapse¶
- In Synapse Studio, navigate to Develop
- Click + > Power BI > Power BI report
- If prompted, sign in to your Power BI account
Step 2: Create a Direct Query Report¶
- Select Build new report
- In the connection dialog:
- Connect to: Select your Synapse workspace
- SQL pool: Select Built-in
- Database: Select sales
- Choose DirectQuery mode
Step 3: Design Visualizations¶
Create the following visualizations:
- Sales Trend Line Chart:
- Drag
vw_SalesTrendsto the canvas -
Create a line chart with:
- Axis: Month and Year
- Values: TotalSales
-
Product Performance Card:
- Create a table visualization with:
- Values: ProductName, TotalRevenue, PerformanceCategory
-
Apply conditional formatting to PerformanceCategory
-
Units Sold by Product Pie Chart:
- Create a pie chart with:
- Legend: ProductName
- Values: TotalUnitsSold
Step 4: Save and Publish the Report¶
- Save the report as
Sales Analysis Dashboard - Click Publish to publish to your Power BI workspace
- Return to Synapse Studio and link the report to your workspace
Part 6: Automate and Schedule¶
Step 1: Create a Trigger for the Pipeline¶
- In Synapse Studio, navigate to Integrate
- Select your
MasterSalesPipeline - Click Add trigger > New/Edit
- Configure a schedule trigger:
- Type: Schedule
- Start date: Select today's date
- Recurrence: Daily
- Time: Set to run during off-peak hours
Step 2: Set Up Monitoring¶
- Navigate to Monitor in Synapse Studio
- Select Pipeline runs
- Configure pipeline run alerts:
- Click New alert rule
- Set condition: Failed pipeline runs
- Set action group: Create a new action group for email notifications
Conclusion and Next Steps¶
Congratulations! You've completed an end-to-end data pipeline in Azure Synapse Analytics that:
- Ingests data from a source
- Stages and transforms the data
- Loads it into Delta Lake tables
- Makes it available for SQL analysis
- Visualizes the results with Power BI
- Automates and orchestrates the entire process
Next Steps¶
To extend this tutorial:
- Add data quality validation steps
- Implement incremental loading patterns
- Add machine learning predictions to the pipeline
- Integrate with Azure Purview for data governance
- Implement CI/CD for your pipeline using Azure DevOps
Troubleshooting¶
If you encounter issues during this tutorial, refer to the Troubleshooting Guide for common solutions to Synapse problems.