Event Grid Integration with Event Hubs¶
Home | Implementation | Integration | EventGrid + EventHubs
Route Azure events to Event Hubs for stream processing and analytics.
Overview¶
Event Grid to Event Hubs integration enables:
- Centralized event collection from multiple Azure services
- Durable event storage for replay and audit
- Stream processing of cloud events
Implementation¶
Step 1: Create Event Hub Destination¶
# Create Event Hub namespace
az eventhubs namespace create \
--resource-group rg-analytics \
--name eh-events-ns \
--sku Standard \
--enable-auto-inflate \
--maximum-throughput-units 10
# Create Event Hub
az eventhubs eventhub create \
--resource-group rg-analytics \
--namespace-name eh-events-ns \
--name cloud-events \
--partition-count 4 \
--message-retention 7
Step 2: Create Event Grid Subscription¶
# Subscribe to storage events
az eventgrid event-subscription create \
--name storage-to-eventhub \
--source-resource-id "/subscriptions/{sub}/resourceGroups/rg-analytics/providers/Microsoft.Storage/storageAccounts/datalake" \
--endpoint-type eventhub \
--endpoint "/subscriptions/{sub}/resourceGroups/rg-analytics/providers/Microsoft.EventHub/namespaces/eh-events-ns/eventhubs/cloud-events" \
--included-event-types Microsoft.Storage.BlobCreated Microsoft.Storage.BlobDeleted
Step 3: ARM Template for Multiple Subscriptions¶
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"resources": [
{
"type": "Microsoft.EventGrid/systemTopics",
"apiVersion": "2022-06-15",
"name": "storage-topic",
"location": "[resourceGroup().location]",
"properties": {
"source": "[resourceId('Microsoft.Storage/storageAccounts', 'datalake')]",
"topicType": "Microsoft.Storage.StorageAccounts"
}
},
{
"type": "Microsoft.EventGrid/systemTopics/eventSubscriptions",
"apiVersion": "2022-06-15",
"name": "storage-topic/blob-events-to-eh",
"dependsOn": ["[resourceId('Microsoft.EventGrid/systemTopics', 'storage-topic')]"],
"properties": {
"destination": {
"endpointType": "EventHub",
"properties": {
"resourceId": "[resourceId('Microsoft.EventHub/namespaces/eventhubs', 'eh-events-ns', 'cloud-events')]"
}
},
"filter": {
"includedEventTypes": [
"Microsoft.Storage.BlobCreated",
"Microsoft.Storage.BlobDeleted"
],
"subjectBeginsWith": "/blobServices/default/containers/bronze/",
"isSubjectCaseSensitive": false
},
"eventDeliverySchema": "CloudEventSchemaV1_0"
}
}
]
}
Step 4: Process Events in Databricks¶
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Schema for CloudEvents
cloud_event_schema = StructType([
StructField("specversion", StringType(), True),
StructField("type", StringType(), True),
StructField("source", StringType(), True),
StructField("id", StringType(), True),
StructField("time", TimestampType(), True),
StructField("subject", StringType(), True),
StructField("data", StringType(), True)
])
# Read from Event Hub
events_df = spark.readStream.format("eventhubs") \
.options(**eventhub_config) \
.load() \
.withColumn("event", from_json(col("body").cast("string"), cloud_event_schema)) \
.select("event.*")
# Filter by event type
blob_created = events_df.filter(col("type") == "Microsoft.Storage.BlobCreated")
blob_deleted = events_df.filter(col("type") == "Microsoft.Storage.BlobDeleted")
# Extract blob details
blob_details = blob_created.withColumn(
"data_parsed",
from_json(col("data"), StructType([
StructField("api", StringType()),
StructField("contentType", StringType()),
StructField("contentLength", LongType()),
StructField("url", StringType())
]))
).select(
col("id").alias("event_id"),
col("time").alias("event_time"),
col("subject"),
col("data_parsed.url").alias("blob_url"),
col("data_parsed.contentLength").alias("size_bytes"),
col("data_parsed.contentType").alias("content_type")
)
Step 5: Multi-Source Event Aggregation¶
# Create subscriptions for multiple sources
sources = [
{"name": "storage", "topic_type": "Microsoft.Storage.StorageAccounts"},
{"name": "keyvault", "topic_type": "Microsoft.KeyVault.vaults"},
{"name": "containerregistry", "topic_type": "Microsoft.ContainerRegistry.registries"}
]
# Process unified event stream
unified_events = events_df.select(
col("id").alias("event_id"),
col("type").alias("event_type"),
col("source").alias("event_source"),
col("time").alias("event_time"),
col("subject"),
col("data")
)
# Write to Delta Lake for audit
unified_events.writeStream \
.format("delta") \
.partitionBy("event_type") \
.option("checkpointLocation", "/checkpoints/cloud_events") \
.toTable("audit.cloud_events")
Event Filtering¶
{
"filter": {
"includedEventTypes": ["Microsoft.Storage.BlobCreated"],
"subjectBeginsWith": "/blobServices/default/containers/bronze/",
"subjectEndsWith": ".parquet",
"advancedFilters": [
{
"operatorType": "NumberGreaterThan",
"key": "data.contentLength",
"value": 1048576
}
]
}
}
Related Documentation¶
Last Updated: January 2025