Skip to content

🔄 Event-Driven Architecture

Pattern Complexity

Design patterns and best practices for building event-driven applications with Azure Event Grid.


🎯 What is Event-Driven Architecture?

Event-Driven Architecture (EDA) is a software design pattern where system components communicate through the production, detection, and consumption of events. An event represents a significant change in state or an occurrence in the system.

Core Principles

  • Loose Coupling: Components don't know about each other
  • Asynchronous Communication: Non-blocking event processing
  • Scalability: Components scale independently
  • Resilience: Failures are isolated
  • Real-time Responsiveness: Immediate reaction to events

🏗️ Architecture Patterns

1. Publisher-Subscriber Pattern

graph LR
    subgraph "Publishers"
        P1[Order Service]
        P2[Inventory Service]
        P3[Payment Service]
    end

    subgraph "Event Grid"
        Topic[Event Topic]
    end

    subgraph "Subscribers"
        S1[Email Service]
        S2[Analytics Service]
        S3[Notification Service]
        S4[Audit Log]
    end

    P1 -->|Publish| Topic
    P2 -->|Publish| Topic
    P3 -->|Publish| Topic

    Topic -->|Subscribe| S1
    Topic -->|Subscribe| S2
    Topic -->|Subscribe| S3
    Topic -->|Subscribe| S4

Implementation:

from azure.eventgrid import EventGridPublisherClient, EventGridEvent
from azure.core.credentials import AzureKeyCredential
from datetime import datetime

class OrderService:
    """Order service publishes events to Event Grid."""

    def __init__(self, topic_endpoint, topic_key):
        credential = AzureKeyCredential(topic_key)
        self.client = EventGridPublisherClient(topic_endpoint, credential)

    def create_order(self, order_data):
        """Create order and publish event."""
        # Business logic
        order_id = self.save_order(order_data)

        # Publish event
        event = EventGridEvent(
            event_type="order.created",
            data={
                "order_id": order_id,
                "customer_id": order_data["customer_id"],
                "total_amount": order_data["total"],
                "items": order_data["items"],
                "timestamp": datetime.utcnow().isoformat()
            },
            subject=f"orders/{order_id}",
            data_version="1.0"
        )

        self.client.send([event])
        return order_id

2. Event Sourcing Pattern

graph TB
    Command[User Command] --> Service[Application Service]
    Service --> Events[Event Store]
    Events --> Projections[Read Models/Projections]
    Events --> Handlers[Event Handlers]

    Handlers --> Email[Email Notifications]
    Handlers --> Reports[Reports]
    Handlers --> Analytics[Analytics]

Implementation:

from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Event:
    """Base event class."""
    event_id: str
    event_type: str
    timestamp: datetime
    data: dict

class EventStore:
    """Event store for event sourcing."""

    def __init__(self, event_grid_client):
        self.client = event_grid_client
        self.events = []  # In production, use durable storage

    def append_event(self, event: Event):
        """Append event to store and publish."""
        # Store event
        self.events.append(event)

        # Publish to Event Grid
        grid_event = EventGridEvent(
            event_type=event.event_type,
            data=event.data,
            subject=event.event_id,
            data_version="1.0"
        )
        self.client.send([grid_event])

    def replay_events(self, entity_id):
        """Replay events to rebuild state."""
        return [e for e in self.events if e.data.get("entity_id") == entity_id]

class BankAccount:
    """Aggregate using event sourcing."""

    def __init__(self, account_id, event_store):
        self.account_id = account_id
        self.event_store = event_store
        self.balance = 0
        self.version = 0

    def deposit(self, amount):
        """Deposit money and create event."""
        event = Event(
            event_id=f"{self.account_id}-{self.version + 1}",
            event_type="account.deposited",
            timestamp=datetime.utcnow(),
            data={
                "entity_id": self.account_id,
                "amount": amount,
                "balance_after": self.balance + amount
            }
        )

        self.event_store.append_event(event)
        self.apply(event)

    def apply(self, event: Event):
        """Apply event to update state."""
        if event.event_type == "account.deposited":
            self.balance += event.data["amount"]
            self.version += 1
        elif event.event_type == "account.withdrawn":
            self.balance -= event.data["amount"]
            self.version += 1

3. Choreography Pattern

Services react to events without central orchestration:

graph TB
    Order[Order Created] --> Inventory[Inventory Service]
    Inventory -->|Reserved| Payment[Payment Service]
    Payment -->|Charged| Shipping[Shipping Service]
    Shipping -->|Shipped| Notification[Notification Service]

    Payment -.->|Failed| Inventory2[Inventory Service<br/>Release]
    Shipping -.->|Failed| Payment2[Payment Service<br/>Refund]

Implementation:

import azure.functions as func
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential

# Inventory Service Handler
def inventory_handler(event: func.EventGridEvent):
    """React to order.created event."""
    if event.event_type == "order.created":
        order_data = event.get_json()

        # Reserve inventory
        success = reserve_inventory(order_data["items"])

        # Publish result event
        if success:
            publish_event("inventory.reserved", order_data)
        else:
            publish_event("inventory.insufficient", order_data)

# Payment Service Handler
def payment_handler(event: func.EventGridEvent):
    """React to inventory.reserved event."""
    if event.event_type == "inventory.reserved":
        order_data = event.get_json()

        # Process payment
        success = charge_payment(order_data["payment_info"])

        # Publish result event
        if success:
            publish_event("payment.completed", order_data)
        else:
            publish_event("payment.failed", order_data)
            # Trigger compensation
            publish_event("inventory.release", order_data)

4. Orchestration Pattern (Saga Pattern)

Central coordinator manages distributed transactions:

graph TB
    Saga[Saga Orchestrator] --> Order[Create Order]
    Order --> Inventory[Reserve Inventory]
    Inventory --> Payment[Process Payment]
    Payment --> Shipping[Schedule Shipping]

    Shipping -.->|Failure| Compensate[Compensation Flow]
    Compensate --> RefundPayment[Refund Payment]
    RefundPayment --> ReleaseInventory[Release Inventory]
    ReleaseInventory --> CancelOrder[Cancel Order]

Implementation with Logic Apps:

{
  "definition": {
    "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
    "triggers": {
      "When_order_created_event": {
        "type": "EventGrid",
        "inputs": {
          "eventType": "order.created",
          "topic": "/subscriptions/.../topics/orders"
        }
      }
    },
    "actions": {
      "Reserve_Inventory": {
        "type": "Http",
        "inputs": {
          "method": "POST",
          "uri": "https://inventory-api.com/reserve"
        },
        "runAfter": {}
      },
      "Process_Payment": {
        "type": "Http",
        "inputs": {
          "method": "POST",
          "uri": "https://payment-api.com/charge"
        },
        "runAfter": {
          "Reserve_Inventory": ["Succeeded"]
        }
      },
      "Compensate_on_Failure": {
        "type": "Scope",
        "actions": {
          "Release_Inventory": {
            "type": "Http",
            "inputs": {
              "method": "POST",
              "uri": "https://inventory-api.com/release"
            }
          }
        },
        "runAfter": {
          "Process_Payment": ["Failed"]
        }
      }
    }
  }
}

🎯 Best Practices

1. Event Design

from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import uuid

@dataclass
class WellDesignedEvent:
    """Example of well-designed event."""

    # Standard fields
    event_id: str = str(uuid.uuid4())
    event_type: str = ""  # e.g., "order.created"
    event_timestamp: datetime = datetime.utcnow()
    event_version: str = "1.0"

    # Subject/Source
    subject: str = ""  # e.g., "orders/order-123"
    source: str = ""  # e.g., "order-service"

    # Event data
    data: dict = None

    # Correlation
    correlation_id: Optional[str] = None
    causation_id: Optional[str] = None

    def to_eventgrid_event(self):
        """Convert to Event Grid event format."""
        from azure.eventgrid import EventGridEvent

        return EventGridEvent(
            id=self.event_id,
            event_type=self.event_type,
            data={
                **self.data,
                "correlation_id": self.correlation_id,
                "causation_id": self.causation_id
            },
            subject=self.subject,
            event_time=self.event_timestamp,
            data_version=self.event_version
        )

2. Idempotency

import hashlib
import json

class IdempotentEventHandler:
    """Handle events idempotently."""

    def __init__(self):
        self.processed_events = set()  # In production, use Redis/Cosmos DB

    def compute_event_hash(self, event):
        """Compute unique hash for event."""
        event_str = json.dumps({
            "id": event.id,
            "type": event.event_type,
            "subject": event.subject
        }, sort_keys=True)

        return hashlib.sha256(event_str.encode()).hexdigest()

    def process_event(self, event):
        """Process event only once."""
        event_hash = self.compute_event_hash(event)

        if event_hash in self.processed_events:
            print(f"Event {event.id} already processed, skipping")
            return

        # Process event
        self.handle_event(event)

        # Mark as processed
        self.processed_events.add(event_hash)

    def handle_event(self, event):
        """Actual event processing logic."""
        print(f"Processing event {event.id}")

3. Error Handling and Dead Letter Queues

# Create storage account for dead letter queue
az storage account create \
  --name deadletterstorage \
  --resource-group rg-eventgrid \
  --location eastus \
  --sku Standard_LRS

# Create event subscription with dead letter configuration
az eventgrid event-subscription create \
  --name resilient-subscription \
  --source-resource-id /subscriptions/.../topics/my-topic \
  --endpoint https://myhandler.com/api/events \
  --max-delivery-attempts 30 \
  --event-ttl 1440 \
  --deadletter-endpoint /subscriptions/.../storageAccounts/deadletterstorage/blobServices/default/containers/deadletter

Process Dead Letter Events:

from azure.storage.blob import BlobServiceClient
from azure.eventgrid import EventGridEvent
import json

def process_dead_letter_events():
    """Process events from dead letter queue."""
    blob_service = BlobServiceClient.from_connection_string(connection_string)
    container = blob_service.get_container_client("deadletter")

    for blob in container.list_blobs():
        # Download dead letter event
        blob_client = container.get_blob_client(blob.name)
        content = blob_client.download_blob().readall()

        event_data = json.loads(content)

        # Analyze failure
        print(f"Failed event: {event_data['event_type']}")
        print(f"Reason: {event_data.get('reason')}")

        # Attempt manual processing or alert
        handle_failed_event(event_data)

4. Event Versioning

def handle_versioned_event(event):
    """Handle different event versions."""
    version = event.data_version

    if version == "1.0":
        return process_v1_event(event)
    elif version == "2.0":
        return process_v2_event(event)
    else:
        raise ValueError(f"Unsupported event version: {version}")

def process_v1_event(event):
    """Process version 1.0 events."""
    data = event.get_json()
    # V1 schema: {order_id, total}
    return {
        "order_id": data["order_id"],
        "total": data["total"],
        "currency": "USD"  # Default for V1
    }

def process_v2_event(event):
    """Process version 2.0 events."""
    data = event.get_json()
    # V2 schema: {order_id, total, currency}
    return {
        "order_id": data["order_id"],
        "total": data["total"],
        "currency": data["currency"]
    }

📊 Monitoring Event-Driven Systems

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace

# Configure monitoring
configure_azure_monitor()
tracer = trace.get_tracer(__name__)

def publish_with_tracing(event_data):
    """Publish event with distributed tracing."""
    with tracer.start_as_current_span("publish_event") as span:
        span.set_attribute("event.type", event_data["event_type"])
        span.set_attribute("event.subject", event_data["subject"])

        # Publish event
        client.send([event_data])

        span.set_attribute("event.id", event_data.id)

def consume_with_tracing(event: func.EventGridEvent):
    """Consume event with distributed tracing."""
    with tracer.start_as_current_span("consume_event") as span:
        span.set_attribute("event.type", event.event_type)
        span.set_attribute("event.id", event.id)

        # Process event
        process_event(event)

Documentation

Patterns

Best Practices


Last Updated: 2025-01-28 Pattern Type: Architectural Documentation Status: Complete