Skip to content

Event Grid Patterns

Home | Best Practices | Cross-Cutting | Event Grid Patterns

Status Category

Event-driven architecture patterns with Azure Event Grid.


Overview

Azure Event Grid enables reactive, event-driven architectures with reliable event delivery and filtering.


Core Patterns

Fan-Out Pattern

# Multiple subscribers receive the same event
# Event Grid handles delivery to all subscribers

# Publisher
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

client = EventGridPublisherClient(
    endpoint="https://<topic>.eastus-1.eventgrid.azure.net/api/events",
    credential=AzureKeyCredential("<key>")
)

# Single event, multiple subscribers
event = {
    "id": str(uuid.uuid4()),
    "eventType": "Data.FileUploaded",
    "subject": "/bronze/sales/2024/01/15/data.parquet",
    "data": {
        "container": "datalake",
        "path": "/bronze/sales/2024/01/15/data.parquet",
        "size": 1048576
    },
    "dataVersion": "1.0"
}

client.send([event])

Event Filtering

{
    "filter": {
        "subjectBeginsWith": "/bronze/sales",
        "subjectEndsWith": ".parquet",
        "includedEventTypes": ["Data.FileUploaded"],
        "advancedFilters": [
            {
                "operatorType": "NumberGreaterThan",
                "key": "data.size",
                "value": 1000000
            }
        ]
    }
}

Data Pipeline Triggers

Storage Event Trigger

# Azure Function triggered by blob events
import azure.functions as func
from azure.storage.blob import BlobServiceClient

def main(event: func.EventGridEvent):
    """Process new data files."""
    data = event.get_json()

    # Extract file info
    blob_url = data['url']
    container = data['container']
    blob_name = data['blobName']

    # Trigger processing pipeline
    if blob_name.startswith('bronze/') and blob_name.endswith('.parquet'):
        trigger_databricks_job(
            job_id="bronze-to-silver",
            parameters={"input_path": blob_url}
        )

Synapse Pipeline Trigger

{
    "name": "EventGridTrigger",
    "type": "BlobEventsTrigger",
    "typeProperties": {
        "blobPathBeginsWith": "/bronze/",
        "blobPathEndsWith": ".parquet",
        "ignoreEmptyBlobs": true,
        "scope": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}",
        "events": ["Microsoft.Storage.BlobCreated"]
    }
}

Error Handling

Dead Letter Configuration

{
    "deadLetterDestination": {
        "endpointType": "StorageBlob",
        "properties": {
            "resourceId": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}",
            "blobContainerName": "deadletter"
        }
    },
    "retryPolicy": {
        "maxDeliveryAttempts": 30,
        "eventTimeToLiveInMinutes": 1440
    }
}

Retry Strategy

class EventGridRetryHandler:
    """Handle Event Grid delivery failures."""

    MAX_RETRIES = 3
    BACKOFF_MULTIPLIER = 2

    async def process_with_retry(self, event: dict):
        """Process event with exponential backoff."""
        for attempt in range(self.MAX_RETRIES):
            try:
                await self.process_event(event)
                return
            except TransientError as e:
                wait_time = (self.BACKOFF_MULTIPLIER ** attempt)
                logger.warning(f"Retry {attempt + 1}, waiting {wait_time}s")
                await asyncio.sleep(wait_time)
            except PermanentError as e:
                await self.send_to_dead_letter(event, str(e))
                raise

        # Max retries exceeded
        await self.send_to_dead_letter(event, "Max retries exceeded")

Monitoring

Event Metrics

// Monitor Event Grid delivery metrics
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.EVENTGRID"
| where OperationName == "Microsoft.EventGrid/events/delivery"
| summarize
    TotalEvents = count(),
    SuccessCount = countif(ResultType == "Success"),
    FailureCount = countif(ResultType == "Failure")
    by bin(TimeGenerated, 5m), Topic = Resource
| extend SuccessRate = round(100.0 * SuccessCount / TotalEvents, 2)
| order by TimeGenerated desc

Alerting

{
    "type": "Microsoft.Insights/metricAlerts",
    "properties": {
        "severity": 2,
        "criteria": {
            "odata.type": "Microsoft.Azure.Monitor.SingleResourceMultipleMetricCriteria",
            "allOf": [
                {
                    "name": "DeadLetterEvents",
                    "metricName": "DeadLetteredCount",
                    "operator": "GreaterThan",
                    "threshold": 0,
                    "timeAggregation": "Total"
                }
            ]
        },
        "actions": [{"actionGroupId": "/subscriptions/{sub}/resourceGroups/{rg}/providers/microsoft.insights/actionGroups/ops-team"}]
    }
}

Security

Webhook Validation

from azure.functions import HttpRequest, HttpResponse

def validate_subscription(req: HttpRequest) -> HttpResponse:
    """Handle Event Grid subscription validation."""
    body = req.get_json()

    # Validation request
    if body[0].get('eventType') == 'Microsoft.EventGrid.SubscriptionValidationEvent':
        validation_code = body[0]['data']['validationCode']
        return HttpResponse(
            json.dumps({"validationResponse": validation_code}),
            mimetype="application/json"
        )

    # Process actual events
    for event in body:
        process_event(event)

    return HttpResponse(status_code=200)

Access Control

resource eventGridTopic 'Microsoft.EventGrid/topics@2022-06-15' = {
  name: 'eg-data-events'
  location: location
  properties: {
    publicNetworkAccess: 'Disabled'
    inboundIpRules: []
  }
}

resource privateEndpoint 'Microsoft.Network/privateEndpoints@2022-01-01' = {
  name: 'pe-eventgrid'
  location: location
  properties: {
    subnet: {
      id: subnetId
    }
    privateLinkServiceConnections: [
      {
        name: 'eventgrid-connection'
        properties: {
          privateLinkServiceId: eventGridTopic.id
          groupIds: ['topic']
        }
      }
    ]
  }
}


Last Updated: January 2025