Azure Synapse Shared Metadata - Code Examples¶
Home > Architecture > Shared Metadata > Code Examples
This document provides practical code examples for implementing the shared metadata architecture in Azure Synapse Analytics, focusing on serverless replicated databases, three-part naming solutions, and best practices for implementing layered architectures.
Creating and Synchronizing Databases Between Spark and Serverless SQL¶
Creating a Database and Table in Spark¶
# In a Spark notebook
# Create a new database
spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")
# Create a table using Parquet format (will be synchronized to SQL)
spark.sql("""
CREATE TABLE IF NOT EXISTS sales_db.transactions (
transaction_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
price DECIMAL(10,2),
transaction_date DATE,
store_id STRING
) USING PARQUET
""")
# Insert sample data
spark.sql("""
INSERT INTO sales_db.transactions VALUES
('T1001', 'C100', 'P5001', 2, 120.50, '2023-02-15', 'S001'),
('T1002', 'C102', 'P5002', 1, 89.99, '2023-02-15', 'S002'),
('T1003', 'C100', 'P5003', 3, 25.25, '2023-02-16', 'S001'),
('T1004', 'C103', 'P5001', 1, 120.50, '2023-02-16', 'S003')
""")
Accessing the Synchronized Table from Serverless SQL¶
-- Connect to Serverless SQL Pool
-- The database has been automatically created and synchronized
-- View available tables in the synchronized database
USE sales_db;
SELECT * FROM sys.tables;
-- Query the synchronized table
-- Note the "dbo" schema - tables appear in the dbo schema in SQL
SELECT *
FROM sales_db.dbo.transactions
WHERE transaction_date = '2023-02-15';
-- Using three-part naming
SELECT t.customer_id, SUM(t.price * t.quantity) AS total_spent
FROM sales_db.dbo.transactions t
GROUP BY t.customer_id
ORDER BY total_spent DESC;
Working with Delta Tables¶
# In a Spark notebook
# Create a Delta table (will be synchronized to SQL)
spark.sql("CREATE DATABASE IF NOT EXISTS inventory_db")
spark.sql("""
CREATE TABLE IF NOT EXISTS inventory_db.product_inventory (
product_id STRING,
product_name STRING,
category STRING,
quantity_on_hand INT,
reorder_level INT,
last_updated TIMESTAMP
) USING DELTA
""")
# Insert sample data
spark.sql("""
INSERT INTO inventory_db.product_inventory VALUES
('P5001', 'Premium Widget', 'Widgets', 350, 100, current_timestamp()),
('P5002', 'Standard Widget', 'Widgets', 500, 150, current_timestamp()),
('P5003', 'Economy Gadget', 'Gadgets', 275, 75, current_timestamp()),
('P5004', 'Premium Gadget', 'Gadgets', 120, 50, current_timestamp())
""")
# Update data - these changes will be synchronized to SQL
spark.sql("""
UPDATE inventory_db.product_inventory
SET quantity_on_hand = 300, last_updated = current_timestamp()
WHERE product_id = 'P5001'
""")
Querying Delta Table from Serverless SQL¶
-- Delta tables are also synchronized (in preview)
SELECT * FROM inventory_db.dbo.product_inventory
WHERE category = 'Widgets';
-- Join with the sales data from another database
SELECT
i.product_id,
i.product_name,
i.quantity_on_hand,
SUM(t.quantity) AS quantity_sold,
SUM(t.price * t.quantity) AS total_revenue
FROM inventory_db.dbo.product_inventory i
JOIN sales_db.dbo.transactions t ON i.product_id = t.product_id
GROUP BY
i.product_id,
i.product_name,
i.quantity_on_hand
ORDER BY total_revenue DESC;
Managing Schema Evolution¶
Adding Columns in Spark (Synchronized to SQL)¶
# Add a column to existing table in Spark
spark.sql("""
ALTER TABLE sales_db.transactions
ADD COLUMN payment_method STRING
""")
# Update the new column
spark.sql("""
UPDATE sales_db.transactions
SET payment_method =
CASE
WHEN transaction_id = 'T1001' THEN 'Credit Card'
WHEN transaction_id = 'T1002' THEN 'Cash'
WHEN transaction_id = 'T1003' THEN 'Credit Card'
WHEN transaction_id = 'T1004' THEN 'Digital Wallet'
END
""")
Querying Updated Schema in SQL¶
-- The new column is now available in SQL after synchronization
SELECT
transaction_id,
customer_id,
payment_method,
price * quantity AS total
FROM sales_db.dbo.transactions
WHERE payment_method = 'Credit Card';
Implementing Layered Architecture with Shared Metadata¶
Raw Layer (Direct Access, Minimal Shared Metadata)¶
# In Spark, create a database for raw data
spark.sql("CREATE DATABASE IF NOT EXISTS raw_data")
# Create external table pointing to raw data files
spark.sql("""
CREATE TABLE raw_data.customer_raw (
customer_data STRING
) USING CSV
LOCATION 'abfss://datalake@storageaccount.dfs.core.windows.net/raw/customers/'
OPTIONS (header 'true', inferSchema 'true')
""")
Silver Layer (Apply Shared Metadata)¶
# Create curated database for transformed/validated data
spark.sql("CREATE DATABASE IF NOT EXISTS silver_db")
# Transform raw data into structured format with proper data types
spark.sql("""
CREATE TABLE silver_db.customers
USING PARQUET
AS
SELECT
from_json(customer_data, 'id STRING, name STRING, email STRING, signup_date DATE') AS customer
FROM raw_data.customer_raw
""")
# Flatten the structure for easier access
spark.sql("""
CREATE TABLE silver_db.customers_flattened
USING PARQUET
AS
SELECT
customer.id AS customer_id,
customer.name AS full_name,
customer.email,
customer.signup_date
FROM silver_db.customers
""")
Gold Layer (Business-Ready Data with Full Shared Metadata)¶
# Create business-ready database
spark.sql("CREATE DATABASE IF NOT EXISTS gold_db")
# Create business metrics table that joins data from multiple sources
spark.sql("""
CREATE TABLE gold_db.customer_sales_summary
USING DELTA
AS
SELECT
c.customer_id,
c.full_name,
c.email,
COUNT(t.transaction_id) AS total_transactions,
SUM(t.price * t.quantity) AS total_spent,
MAX(t.transaction_date) AS last_purchase_date
FROM silver_db.customers_flattened c
LEFT JOIN sales_db.transactions t ON c.customer_id = t.customer_id
GROUP BY c.customer_id, c.full_name, c.email
""")
Accessing the Layered Architecture from SQL¶
-- Raw layer is typically accessed directly in Spark, not via shared metadata
-- Silver layer via shared metadata
SELECT * FROM silver_db.dbo.customers_flattened
WHERE signup_date >= '2023-01-01';
-- Gold layer via shared metadata
SELECT
customer_id,
full_name,
total_transactions,
total_spent,
CASE
WHEN total_spent > 1000 THEN 'Premium'
WHEN total_spent > 500 THEN 'Standard'
ELSE 'Basic'
END AS customer_tier
FROM gold_db.dbo.customer_sales_summary
ORDER BY total_spent DESC;
Working with Manual External Tables for Non-Synchronized Formats¶
If you have data in formats not automatically synchronized from Spark (like JSON, ORC), you can manually create external tables in serverless SQL:
-- Create an external data source if you don't have one
CREATE DATABASE scoped CREDENTIAL [SasCredential]
WITH IDENTITY='SHARED ACCESS SIGNATURE',
SECRET = 'sv=2020-02-10&ss=bfqt&srt=sco&sp=rwdlacupx&se=2023-04-15T01:15:28Z&st=2021-03-15T17:15:28Z&spr=https&sig=XXXXX'
CREATE EXTERNAL DATA SOURCE ExternalDataSource
WITH (
LOCATION = 'https://storageaccount.dfs.core.windows.net/datalake',
CREDENTIAL = [SasCredential]
)
-- Create an external file format for JSON
CREATE EXTERNAL FILE FORMAT JsonFormat
WITH (
FORMAT_TYPE = JSON
)
-- Create an external table for JSON data
CREATE EXTERNAL TABLE external_db.customer_preferences (
customer_id VARCHAR(50),
preferences NVARCHAR(MAX)
)
WITH (
LOCATION = '/analytics/customer-preferences/',
DATA_SOURCE = ExternalDataSource,
FILE_FORMAT = JsonFormat
)
Monitoring Metadata Synchronization¶
-- Check if tables have been synchronized correctly
USE sales_db;
SELECT
name,
create_date,
modify_date,
type_desc
FROM sys.tables;
-- Check external data sources
SELECT * FROM sys.external_data_sources;
-- Check external file formats
SELECT * FROM sys.external_file_formats;
-- View column definitions
SELECT
t.name AS table_name,
c.name AS column_name,
ty.name AS data_type,
c.max_length,
c.precision,
c.scale,
c.is_nullable
FROM sys.tables t
JOIN sys.columns c ON t.object_id = c.object_id
JOIN sys.types ty ON c.user_type_id = ty.user_type_id
WHERE t.name = 'transactions'
ORDER BY c.column_id;
These code examples demonstrate how to implement the shared metadata architecture in Azure Synapse Analytics using best practices for serverless replicated databases, handling three-part naming, and implementing a layered data architecture.