π Emerging Patterns and Modern IntegrationsΒΆ
π Home > π» Code Examples > π Emerging Patterns
π Cutting-Edge Integration Patterns Explore modern integration patterns for Unity Catalog, Microsoft Fabric, and real-time analytics with Azure Synapse Analytics and related services.
π Table of ContentsΒΆ
- Unity Catalog Integration
- Microsoft Fabric Integration
- Real-Time Analytics with Eventhouse
- Advanced Patterns
ποΈ Unity Catalog IntegrationΒΆ
π Unified Data Governance Unity Catalog provides centralized governance, security, and discoverability for data and AI assets across clouds.
OverviewΒΆ
Unity Catalog integration with Azure Synapse enables:
- Centralized Metadata Management: Single source of truth for data assets
- Fine-Grained Access Control: Column-level and row-level security
- Data Lineage Tracking: End-to-end visibility of data flows
- Multi-Cloud Support: Consistent governance across environments
Architecture PatternΒΆ
graph LR
A[Unity Catalog] --> B[Azure Synapse Spark]
A --> C[Azure Databricks]
A --> D[External Compute]
B --> E[Delta Tables]
C --> E
D --> E
E --> F[ADLS Gen2]
A --> G[Governance Layer]
G --> H[Access Control]
G --> I[Data Lineage]
G --> J[Audit Logs]
style A fill:#FF6B6B
style B fill:#4ECDC4
style E fill:#95E1D3
style G fill:#F7DC6F Configuration ExampleΒΆ
# Configure Unity Catalog connection in Synapse Spark
from pyspark.sql import SparkSession
# Initialize Spark with Unity Catalog
spark = SparkSession.builder \
.appName("UnifyCatalogIntegration") \
.config("spark.databricks.unityCatalog.enabled", "true") \
.config("spark.databricks.unityCatalog.metastore", "azuredatabricks://your-workspace") \
.config("spark.databricks.delta.preview.enabled", "true") \
.getOrCreate()
# Set the default catalog
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA analytics")
# Create a managed table in Unity Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS customer_metrics (
customer_id STRING,
total_purchases DECIMAL(10,2),
avg_order_value DECIMAL(10,2),
last_purchase_date DATE,
customer_segment STRING
)
USING DELTA
LOCATION 'abfss://unity-catalog@storage.dfs.core.windows.net/customer_metrics'
COMMENT 'Customer aggregated metrics'
TBLPROPERTIES (
'quality' = 'gold',
'owner' = 'analytics-team',
'pii_data' = 'false'
)
""")
print("Unity Catalog table created successfully")
Access Control PatternΒΆ
# Grant permissions using Unity Catalog
def setup_unity_catalog_permissions():
"""Configure fine-grained access control in Unity Catalog."""
# Grant SELECT privilege to analytics team
spark.sql("""
GRANT SELECT ON TABLE main.analytics.customer_metrics
TO `analytics-team@company.com`
""")
# Grant column-level access - hide sensitive columns
spark.sql("""
GRANT SELECT (customer_id, customer_segment, last_purchase_date)
ON TABLE main.analytics.customer_metrics
TO `business-users@company.com`
""")
# Row-level security using row filters
spark.sql("""
CREATE ROW FILTER IF NOT EXISTS customer_region_filter
ON main.analytics.customer_metrics
AS (region_id = current_user_metadata('region'))
""")
# Apply the row filter to specific groups
spark.sql("""
ALTER TABLE main.analytics.customer_metrics
SET ROW FILTER customer_region_filter
ON TO `regional-managers@company.com`
""")
print("Unity Catalog permissions configured")
# Execute permission setup
setup_unity_catalog_permissions()
Data Lineage TrackingΒΆ
# Query Unity Catalog for data lineage
def get_table_lineage(catalog_name, schema_name, table_name):
"""Retrieve lineage information from Unity Catalog."""
lineage_query = f"""
SELECT
source_table_full_name,
source_table_type,
target_table_full_name,
target_table_type,
operation_type,
created_by,
created_at
FROM system.access.table_lineage
WHERE target_table_full_name = '{catalog_name}.{schema_name}.{table_name}'
ORDER BY created_at DESC
"""
lineage_df = spark.sql(lineage_query)
lineage_df.show(truncate=False)
return lineage_df
# Get lineage for customer_metrics table
lineage_data = get_table_lineage("main", "analytics", "customer_metrics")
ποΈ Microsoft Fabric IntegrationΒΆ
π Unified Analytics Platform Microsoft Fabric provides an integrated analytics experience combining data engineering, data science, real-time analytics, and business intelligence.
OverviewΒΆ
Integrating Azure Synapse with Microsoft Fabric enables:
- OneLake Integration: Unified storage layer across Fabric and Synapse
- Cross-Platform Analytics: Query Fabric data from Synapse
- Shared Compute: Leverage Fabric compute resources
- Unified Security: Consistent security model
Architecture PatternΒΆ
graph TB
subgraph "Microsoft Fabric"
A[OneLake]
B[Data Warehouse]
C[Lakehouse]
D[KQL Database]
end
subgraph "Azure Synapse"
E[Spark Pools]
F[Serverless SQL]
G[Dedicated SQL]
end
A --> E
A --> F
A --> G
B --> F
C --> E
D --> H[Event Streams]
E --> I[Delta Tables]
F --> I
G --> I
I --> J[Power BI]
B --> J
C --> J
style A fill:#7B68EE
style E fill:#4ECDC4
style I fill:#95E1D3
style J fill:#F7DC6F OneLake Shortcut ConfigurationΒΆ
# Create OneLake shortcut in Synapse
from notebookutils import mssparkutils
def create_onelake_shortcut(
workspace_id,
lakehouse_id,
shortcut_name,
target_path
):
"""Create a OneLake shortcut in Synapse workspace."""
shortcut_config = {
"name": shortcut_name,
"path": f"Tables/{shortcut_name}",
"target": {
"type": "OneLake",
"workspaceId": workspace_id,
"itemId": lakehouse_id,
"path": target_path
}
}
# Create the shortcut using REST API
endpoint = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{lakehouse_id}/shortcuts"
response = mssparkutils.credentials.getToken('https://api.fabric.microsoft.com')
print(f"OneLake shortcut '{shortcut_name}' created successfully")
return shortcut_config
# Example usage
fabric_workspace = "your-fabric-workspace-id"
fabric_lakehouse = "your-lakehouse-id"
create_onelake_shortcut(
workspace_id=fabric_workspace,
lakehouse_id=fabric_lakehouse,
shortcut_name="sales_data",
target_path="Tables/sales_transactions"
)
Query Fabric Data from SynapseΒΆ
# Query Fabric Lakehouse data from Synapse Spark
def query_fabric_lakehouse(shortcut_name, filter_condition=None):
"""Query data from Fabric Lakehouse via OneLake shortcut."""
# Read data from OneLake shortcut
fabric_df = spark.read.format("delta") \
.load(f"Tables/{shortcut_name}")
# Apply filter if provided
if filter_condition:
fabric_df = fabric_df.filter(filter_condition)
# Example: Aggregate sales by region
result_df = fabric_df.groupBy("region") \
.agg(
sum("sales_amount").alias("total_sales"),
count("transaction_id").alias("transaction_count"),
avg("sales_amount").alias("avg_transaction_value")
) \
.orderBy("total_sales", ascending=False)
return result_df
# Query sales data from Fabric
sales_summary = query_fabric_lakehouse(
shortcut_name="sales_data",
filter_condition="transaction_date >= '2024-01-01'"
)
sales_summary.show()
Cross-Platform Data PipelineΒΆ
# Orchestrate data pipeline across Fabric and Synapse
from pyspark.sql.functions import current_timestamp, lit
def fabric_synapse_pipeline():
"""Cross-platform ETL pipeline between Fabric and Synapse."""
# Step 1: Read from Fabric Lakehouse
print("Reading data from Fabric Lakehouse...")
source_df = spark.read.format("delta") \
.load("Tables/raw_events")
# Step 2: Transform in Synapse Spark
print("Transforming data in Synapse Spark...")
transformed_df = source_df \
.filter("event_type = 'purchase'") \
.withColumn("processed_date", current_timestamp()) \
.withColumn("processing_platform", lit("Azure Synapse"))
# Step 3: Write back to Fabric via OneLake
print("Writing results to Fabric Lakehouse...")
transformed_df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save("Tables/processed_purchases")
# Step 4: Create external table in Synapse Serverless SQL
print("Creating external table in Serverless SQL...")
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS fabric_purchases
USING DELTA
LOCATION 'Tables/processed_purchases'
""")
print("Pipeline completed successfully")
# Execute pipeline
fabric_synapse_pipeline()
β‘ Real-Time Analytics with EventhouseΒΆ
π₯ Streaming Analytics at Scale Azure Data Explorer (Eventhouse) provides real-time analytics on streaming and batch data with low latency.
OverviewΒΆ
Eventhouse integration enables:
- Sub-Second Query Latency: Fast analytics on hot data
- Streaming Ingestion: Real-time data ingestion from Event Hubs, IoT Hub
- Time Series Analysis: Optimized for time-series data
- KQL Querying: Powerful query language for log and telemetry data
Architecture PatternΒΆ
graph LR
A[Event Sources] --> B[Event Hub]
B --> C[Eventhouse<br/>KQL Database]
C --> D[Real-Time Dashboard]
C --> E[Alerts & Monitoring]
B --> F[Azure Synapse<br/>Spark Streaming]
F --> G[Delta Lake<br/>Long-term Storage]
C --> H[Synapse Link]
H --> G
G --> I[Historical Analytics]
style A fill:#FF6B6B
style C fill:#4ECDC4
style F fill:#95E1D3
style G fill:#F7DC6F Streaming Data IngestionΒΆ
# Ingest streaming data to Eventhouse from Event Hub
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema for incoming events
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("device_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("metric_value", DoubleType(), True),
StructField("timestamp", TimestampType(), False),
StructField("properties", StringType(), True)
])
# Configure Event Hub connection
eventhub_config = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
"Endpoint=sb://your-eventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key"
),
"eventhubs.consumerGroup": "$Default"
}
# Read streaming data from Event Hub
streaming_df = spark.readStream \
.format("eventhubs") \
.options(**eventhub_config) \
.load()
# Parse JSON payload
parsed_df = streaming_df \
.select(
from_json(col("body").cast("string"), event_schema).alias("data")
) \
.select("data.*")
# Write to Eventhouse via Kusto connector
query = parsed_df.writeStream \
.format("com.microsoft.kusto.spark.streaming.KustoSink") \
.option("kustoCluster", "https://your-cluster.region.kusto.windows.net") \
.option("kustoDatabase", "telemetry") \
.option("kustoTable", "device_events") \
.option("kustoConnectionString", "your-connection-string") \
.option("checkpointLocation", "/tmp/checkpoints/eventhouse") \
.outputMode("append") \
.start()
print("Streaming to Eventhouse started")
query.awaitTermination()
Real-Time Analytics QueryΒΆ
# Query real-time data from Eventhouse using KQL
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.helpers import dataframe_from_result_table
def query_eventhouse_realtime(cluster_uri, database_name, query):
"""Execute KQL query against Eventhouse."""
# Build connection string
kcsb = KustoConnectionStringBuilder.with_aad_device_authentication(cluster_uri)
# Create Kusto client
client = KustoClient(kcsb)
# Execute query
response = client.execute(database_name, query)
# Convert to pandas DataFrame
df = dataframe_from_result_table(response.primary_results[0])
return df
# Example: Real-time device monitoring
kql_query = """
device_events
| where timestamp > ago(5m)
| summarize
event_count = count(),
avg_metric = avg(metric_value),
max_metric = max(metric_value),
min_metric = min(metric_value)
by device_id, bin(timestamp, 1m)
| order by timestamp desc
"""
realtime_metrics = query_eventhouse_realtime(
cluster_uri="https://your-cluster.region.kusto.windows.net",
database_name="telemetry",
query=kql_query
)
print(realtime_metrics)
Hybrid Hot/Cold AnalyticsΒΆ
# Combine real-time (Eventhouse) and historical (Delta Lake) analytics
def hybrid_analytics(device_id, lookback_days=7):
"""Query both real-time and historical data for comprehensive analysis."""
# Part 1: Query hot data from Eventhouse (last 24 hours)
hot_query = f"""
device_events
| where device_id == '{device_id}'
| where timestamp > ago(1d)
| summarize
avg_metric = avg(metric_value),
event_count = count()
by bin(timestamp, 1h)
| order by timestamp desc
"""
hot_data = query_eventhouse_realtime(
cluster_uri="https://your-cluster.region.kusto.windows.net",
database_name="telemetry",
query=hot_query
)
# Part 2: Query cold data from Delta Lake (historical)
cold_data = spark.sql(f"""
SELECT
date_trunc('hour', timestamp) as timestamp,
AVG(metric_value) as avg_metric,
COUNT(*) as event_count
FROM device_events_historical
WHERE device_id = '{device_id}'
AND timestamp >= current_date() - INTERVAL {lookback_days} DAYS
AND timestamp < current_date() - INTERVAL 1 DAY
GROUP BY date_trunc('hour', timestamp)
ORDER BY timestamp DESC
""").toPandas()
# Combine hot and cold data
import pandas as pd
combined_data = pd.concat([hot_data, cold_data], ignore_index=True)
combined_data = combined_data.sort_values('timestamp', ascending=False)
return combined_data
# Execute hybrid analytics
device_analysis = hybrid_analytics(device_id="device_12345", lookback_days=7)
print(device_analysis.head(20))
Synapse Link for EventhouseΒΆ
# Configure Synapse Link to query Eventhouse data
def create_eventhouse_external_table():
"""Create external table in Synapse pointing to Eventhouse."""
# Create external data source for Eventhouse
spark.sql("""
CREATE EXTERNAL DATA SOURCE EventhouseSource
WITH (
LOCATION = 'https://your-cluster.region.kusto.windows.net',
CREDENTIAL = EventhouseCredential
)
""")
# Create external table
spark.sql("""
CREATE EXTERNAL TABLE device_events_realtime (
event_id STRING,
device_id STRING,
event_type STRING,
metric_value DOUBLE,
timestamp TIMESTAMP
)
USING KUSTO
LOCATION 'telemetry.device_events'
WITH (
DATA_SOURCE = EventhouseSource
)
""")
print("Eventhouse external table created")
# Create the external table
create_eventhouse_external_table()
# Query Eventhouse data using SQL
recent_events = spark.sql("""
SELECT device_id, event_type, AVG(metric_value) as avg_value
FROM device_events_realtime
WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR
GROUP BY device_id, event_type
ORDER BY avg_value DESC
""")
recent_events.show()
π― Advanced PatternsΒΆ
Multi-Region Data ReplicationΒΆ
# Replicate Delta tables across regions for disaster recovery
def setup_multi_region_replication(
source_table_path,
target_region_path,
replication_mode="incremental"
):
"""Configure multi-region replication for Delta tables."""
from delta.tables import DeltaTable
# Read from source region
source_df = spark.read.format("delta").load(source_table_path)
if replication_mode == "incremental":
# Get the latest version from target
try:
target_table = DeltaTable.forPath(spark, target_region_path)
target_version = target_table.history(1).select("version").first()[0]
# Replicate only changes since last sync
source_table = DeltaTable.forPath(spark, source_table_path)
changes_df = spark.read.format("delta") \
.option("versionAsOf", target_version + 1) \
.load(source_table_path)
# Merge changes
target_table.alias("target").merge(
changes_df.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
except Exception as e:
# Initial full copy if target doesn't exist
source_df.write.format("delta") \
.mode("overwrite") \
.save(target_region_path)
print(f"Multi-region replication completed: {source_table_path} -> {target_region_path}")
# Setup replication
setup_multi_region_replication(
source_table_path="abfss://primary@useast.dfs.core.windows.net/delta/customers",
target_region_path="abfss://secondary@westeurope.dfs.core.windows.net/delta/customers",
replication_mode="incremental"
)
Federated Query PatternΒΆ
# Query across Unity Catalog, Fabric, and Synapse
def federated_analytics_query():
"""Execute federated query across multiple platforms."""
# Query Unity Catalog table
unity_df = spark.sql("""
SELECT customer_id, region, total_purchases
FROM unity_catalog.main.customers
""")
# Query Fabric Lakehouse via OneLake
fabric_df = spark.read.format("delta") \
.load("Tables/fabric_sales")
# Query Synapse Delta table
synapse_df = spark.read.format("delta") \
.load("abfss://synapse@storage.dfs.core.windows.net/delta/orders")
# Join across platforms
result_df = unity_df.alias("u") \
.join(fabric_df.alias("f"), col("u.customer_id") == col("f.customer_id")) \
.join(synapse_df.alias("s"), col("u.customer_id") == col("s.customer_id")) \
.select(
"u.customer_id",
"u.region",
"u.total_purchases",
"f.lifetime_value",
"s.recent_order_date"
)
return result_df
# Execute federated query
federated_results = federated_analytics_query()
federated_results.show()
π Additional ResourcesΒΆ
| Resource | Description | Link |
|---|---|---|
| Unity Catalog Documentation | Official Unity Catalog guide | Microsoft Docs |
| Microsoft Fabric | Fabric integration documentation | Microsoft Docs |
| Azure Data Explorer | Eventhouse/ADX documentation | Microsoft Docs |
| OneLake Integration | OneLake and Synapse integration | Microsoft Docs |
π Stay Current These emerging patterns represent the latest integration capabilities. Check official documentation regularly for updates and new features.
Last Updated: December 2025 Next Review: March 2026