Streaming Migration: Kinesis and MSK to Event Hubs¶
A deep-dive guide for data engineers migrating Amazon Kinesis and Managed Streaming for Apache Kafka (MSK) to Azure Event Hubs and related streaming services.
Executive summary¶
AWS streaming is split across two service families: Kinesis (Data Streams, Firehose, Analytics) for managed streaming, and MSK for Apache Kafka workloads. Azure Event Hubs serves both roles --- it provides a native streaming service with an AMQP protocol and simultaneously exposes a Kafka-compatible endpoint that allows existing Kafka clients to connect with a configuration change.
The migration strategy depends on the source: Kinesis workloads typically re-architect to Event Hubs native APIs, while MSK workloads can lift-and-shift to Event Hubs Kafka protocol with minimal code changes. Both paths land data on Delta Lake tables for analytics via Databricks Structured Streaming or Fabric Real-Time Intelligence.
Service mapping overview¶
| AWS streaming service | Azure equivalent | Migration complexity | Notes |
|---|---|---|---|
| Kinesis Data Streams | Event Hubs | M | Shard model maps to partition model |
| Kinesis Data Firehose | Event Hubs Capture / ADF | S | Managed delivery to storage |
| Kinesis Data Analytics (SQL) | Stream Analytics | M | SQL-based stream processing |
| Kinesis Data Analytics (Flink) | Databricks Structured Streaming / HDInsight Flink | L | Flink stateful processing |
| MSK (Managed Kafka) | Event Hubs with Kafka protocol | M | Config change for Kafka clients |
| MSK Connect | Event Hubs + ADF connectors | M | Connector-dependent |
| MSK Serverless | Event Hubs (auto-inflate) | S | Serverless scaling |
| Lambda (stream consumer) | Azure Functions (Event Hub trigger) | M | Function-level rewrite |
Part 1: Kinesis Data Streams to Event Hubs¶
Architecture comparison¶
| Concept | Kinesis Data Streams | Event Hubs |
|---|---|---|
| Stream | Stream | Event Hub (within a namespace) |
| Shard | Shard (1 MB/s in, 2 MB/s out) | Partition (1 MB/s in, 2 MB/s out) |
| Partition key | Partition key (MD5 hash to shard) | Partition key (hash to partition) |
| Retention | 24h default, up to 365 days | 1-90 days (standard), unlimited (Dedicated) |
| Consumer | KCL application or Lambda | Consumer group (AMQP) or Kafka consumer |
| Enhanced fan-out | Dedicated 2 MB/s per consumer per shard | $default consumer group (shared) or Kafka consumer groups |
| Sequence number | Per-shard sequence number | Offset + sequence number per partition |
| Capacity mode | On-demand or provisioned shards | Auto-inflate (standard) or Dedicated (CU-based) |
Capacity mapping¶
| Kinesis provisioned | Event Hubs equivalent | Throughput |
|---|---|---|
| 1 shard | 1 TU (Throughput Unit) | 1 MB/s in, 2 MB/s out |
| 10 shards | 10 TUs or 1 PU (Premium) | 10 MB/s in, 20 MB/s out |
| 100 shards | 10 PUs or 1 CU (Dedicated) | 100 MB/s in, 200 MB/s out |
| On-demand (auto) | Auto-inflate enabled | Up to 40 TUs auto (standard) |
Producer migration¶
Kinesis producer (Python/boto3):
import boto3
import json
kinesis = boto3.client('kinesis', region_name='us-gov-west-1')
def send_event(stream_name, event_data, partition_key):
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data).encode('utf-8'),
PartitionKey=partition_key
)
return response['SequenceNumber']
# Send batch
records = [
{'Data': json.dumps(event).encode('utf-8'), 'PartitionKey': event['device_id']}
for event in events
]
kinesis.put_records(StreamName='iot-events', Records=records)
Event Hubs producer (Python/azure-eventhub):
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
import json
credential = DefaultAzureCredential()
producer = EventHubProducerClient(
fully_qualified_namespace="acme-streaming.servicebus.usgovcloudapi.net",
eventhub_name="iot-events",
credential=credential
)
def send_event(event_data, partition_key):
event_data_batch = producer.create_batch(partition_key=partition_key)
event_data_batch.add(EventData(json.dumps(event_data)))
producer.send_batch(event_data_batch)
# Send batch
with producer:
batch = producer.create_batch()
for event in events:
batch.add(EventData(json.dumps(event)))
producer.send_batch(batch)
Consumer migration¶
Kinesis consumer (KCL pattern):
import boto3
kinesis = boto3.client('kinesis')
# Get shard iterator
response = kinesis.get_shard_iterator(
StreamName='iot-events',
ShardId='shardId-000000000000',
ShardIteratorType='LATEST'
)
shard_iterator = response['ShardIterator']
# Poll for records
while True:
response = kinesis.get_records(ShardIterator=shard_iterator, Limit=100)
for record in response['Records']:
process_record(record['Data'])
shard_iterator = response['NextShardIterator']
Event Hubs consumer (EventProcessorClient pattern):
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://acmecheckpoints.blob.core.usgovcloudapi.net",
container_name="checkpoints",
credential=credential
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="acme-streaming.servicebus.usgovcloudapi.net",
eventhub_name="iot-events",
consumer_group="$Default",
credential=credential,
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
process_record(event.body_as_json())
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event, starting_position="-1")
Part 2: Kinesis Data Firehose to Event Hubs Capture¶
Architecture comparison¶
| Firehose capability | Event Hubs Capture / ADF | Notes |
|---|---|---|
| Delivery to S3 | Event Hubs Capture to ADLS Gen2 | Automatic Avro file writing |
| Delivery to Redshift | ADF pipeline from Event Hub to Databricks | Via staging on ADLS |
| Delivery to Elasticsearch | ADF or Azure Function to Azure AI Search | Custom pipeline |
| Format conversion (Parquet) | Databricks Auto Loader (Parquet/Delta) | Capture writes Avro; convert downstream |
| Dynamic partitioning | Capture partitions by time (hour/minute) | Custom partitioning via Databricks |
| Data transformation | Stream Analytics or Azure Functions | Inline transformation |
| Buffering (size/time) | Capture window (1-15 min, 10-300 MB) | Similar buffering controls |
Event Hubs Capture configuration¶
{
"properties": {
"captureDescription": {
"enabled": true,
"encoding": "Avro",
"intervalInSeconds": 300,
"sizeLimitInBytes": 314572800,
"destination": {
"name": "EventHubArchive.AzureBlockBlob",
"properties": {
"storageAccountResourceId": "/subscriptions/.../storageAccounts/acmeanalyticsgov",
"blobContainer": "streaming-raw",
"archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
}
}
}
Landing captured data as Delta tables¶
# Databricks Auto Loader: read Capture Avro files, write Delta
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "avro")
.option("cloudFiles.schemaLocation", "dbfs:/schemas/iot_events")
.load("abfss://streaming-raw@acmeanalyticsgov.dfs.core.usgovcloudapi.net/acme-streaming/iot-events/")
.writeStream
.format("delta")
.option("checkpointLocation", "dbfs:/checkpoints/iot_events")
.trigger(processingTime="1 minute")
.toTable("iot_prod.bronze.raw_events"))
Part 3: Kinesis Data Analytics to Stream Analytics¶
SQL-based stream processing migration¶
Kinesis Analytics SQL:
-- Kinesis Analytics application
CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
device_id VARCHAR(64),
window_start TIMESTAMP,
avg_temperature DOUBLE,
max_temperature DOUBLE,
event_count INTEGER
);
CREATE OR REPLACE PUMP "AGGREGATE_PUMP" AS
INSERT INTO "DESTINATION_STREAM"
SELECT STREAM
"device_id",
STEP("SOURCE_STREAM".ROWTIME BY INTERVAL '5' MINUTE) AS window_start,
AVG("temperature") AS avg_temperature,
MAX("temperature") AS max_temperature,
COUNT(*) AS event_count
FROM "SOURCE_STREAM"
GROUP BY
"device_id",
STEP("SOURCE_STREAM".ROWTIME BY INTERVAL '5' MINUTE);
Azure Stream Analytics equivalent:
-- Stream Analytics job query
SELECT
device_id,
System.Timestamp() AS window_start,
AVG(temperature) AS avg_temperature,
MAX(temperature) AS max_temperature,
COUNT(*) AS event_count
INTO [output-delta]
FROM [input-eventhub]
TIMESTAMP BY event_time
GROUP BY
device_id,
TumblingWindow(minute, 5)
Stream Analytics input configuration:
{
"name": "input-eventhub",
"properties": {
"type": "Stream",
"datasource": {
"type": "Microsoft.EventHub/EventHubs",
"properties": {
"serviceBusNamespace": "acme-streaming",
"eventHubName": "iot-events",
"consumerGroupName": "stream-analytics-cg",
"authenticationMode": "Msi"
}
},
"serialization": { "type": "Json", "encoding": "UTF8" }
}
}
Fabric Real-Time Intelligence (for complex event processing)¶
For workloads that exceed Stream Analytics SQL capabilities (complex stateful processing, pattern matching, ML inference on streams):
// KQL query in Fabric Real-Time Intelligence (Eventhouse)
IoTEvents
| where ingestion_time() > ago(5m)
| summarize
avg_temperature = avg(temperature),
max_temperature = max(temperature),
event_count = count()
by device_id, bin(event_time, 5m)
| where max_temperature > 100
| project device_id, window_start = event_time, avg_temperature, max_temperature, event_count
Part 4: MSK to Event Hubs with Kafka protocol¶
The Kafka compatibility advantage¶
Event Hubs exposes a Kafka-compatible endpoint. Existing Kafka producers and consumers can connect to Event Hubs by changing only the connection configuration --- no code changes required.
Kafka client configuration change¶
MSK connection (original):
# Kafka producer/consumer config for MSK
bootstrap.servers=b-1.acme-msk.abc123.kafka.us-gov-west-1.amazonaws.com:9098,b-2.acme-msk.abc123.kafka.us-gov-west-1.amazonaws.com:9098
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
Event Hubs Kafka endpoint (migrated):
# Kafka producer/consumer config for Event Hubs
bootstrap.servers=acme-streaming.servicebus.usgovcloudapi.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://acme-streaming.servicebus.usgovcloudapi.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<key>";
That is the entire migration for Kafka producers and consumers. The application code, serialization format, topic structure, and consumer group logic remain unchanged.
Topic mapping¶
| MSK concept | Event Hubs equivalent | Notes |
|---|---|---|
| Topic | Event Hub | One topic per Event Hub |
| Partition | Partition | Same concept; partition count set at creation |
| Consumer group | Consumer group | Same concept |
| Offset | Offset | Same concept |
| Retention | Retention | 1-90 days (standard); unlimited (Dedicated) |
| Compaction | Not supported natively | Use Delta Lake for compacted state |
| Schema Registry | Azure Schema Registry | Avro, JSON Schema, Protobuf |
MSK Connect migration¶
MSK Connect runs Kafka Connect connectors as managed services. On Azure, replace with:
- ADF connectors for source/sink patterns that ADF supports natively (databases, blob storage, REST APIs).
- Kafka Connect on AKS for connectors not available in ADF (deploy Kafka Connect cluster on Azure Kubernetes Service, pointing to Event Hubs Kafka endpoint).
- Azure Functions with Event Hub trigger for custom consumer logic.
Part 5: Lambda consumers to Azure Functions¶
Lambda with Kinesis trigger to Azure Function with Event Hub trigger¶
AWS Lambda (Kinesis trigger):
import json
import base64
def lambda_handler(event, context):
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
data = json.loads(payload)
process_event(data)
return {'statusCode': 200}
Azure Function (Event Hub trigger):
import azure.functions as func
import json
import logging
app = func.FunctionApp()
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="iot-events",
connection="EventHubConnection",
consumer_group="functions-cg"
)
def process_iot_events(events: func.EventHubEvent):
for event in events:
data = json.loads(event.get_body().decode('utf-8'))
process_event(data)
logging.info(f"Processed event from device {data['device_id']}")
Key differences:
- Lambda receives base64-encoded Kinesis records; Azure Functions receives deserialized Event Hub events.
- Lambda IAM role replaced by managed identity + Event Hubs Data Receiver role.
- Lambda concurrency per shard replaced by Function scaling per partition.
Worked example: full Kinesis pipeline to Event Hubs¶
Source architecture (AWS)¶
IoT Devices
→ Kinesis Data Stream (iot-events, 10 shards)
→ Lambda consumer (real-time alerts)
→ Kinesis Firehose → S3 (raw archive)
→ Kinesis Analytics (5-min aggregation) → Kinesis Data Stream (iot-aggregated)
→ Lambda consumer (dashboard update)
Target architecture (Azure)¶
IoT Devices
→ Event Hubs (iot-events, 10 partitions)
→ Azure Function (real-time alerts)
→ Event Hubs Capture → ADLS Gen2 (raw archive)
→ Stream Analytics (5-min aggregation) → Event Hubs (iot-aggregated)
→ Databricks Structured Streaming → Delta table → Power BI Direct Lake
Migration sequence¶
- Create Event Hubs namespace with 10 TUs (matching 10 Kinesis shards).
- Create Event Hub
iot-eventswith 10 partitions and 7-day retention. - Configure Capture to write Avro files to ADLS Gen2
streaming-rawcontainer. - Deploy Azure Function with Event Hub trigger for real-time alerts.
- Create Stream Analytics job with 5-minute tumbling window aggregation.
- Create output Event Hub
iot-aggregatedfor aggregated results. - Deploy Databricks Structured Streaming job to read from
iot-aggregatedand write to Delta. - Dual-run both pipelines for 1 week to validate data parity.
- Cut over IoT devices to send to Event Hubs; disable Kinesis streams.
Last updated: 2026-04-30 Maintainers: CSA-in-a-Box core team Related: Migration Center | Compute Migration | Storage Migration | Migration Playbook