Skip to content

🔄 Kafka Compatibility

Compatibility Protocol

Use Azure Event Hubs as a drop-in replacement for Apache Kafka with native protocol support.


🎯 Overview

Azure Event Hubs provides a Kafka endpoint that enables you to use your existing Kafka-based applications without code changes. Simply update your connection configuration to point to Event Hubs instead of your Kafka cluster.

Key Benefits

  • Zero Code Changes: Use existing Kafka applications as-is
  • Managed Service: No cluster management, patching, or scaling
  • Cost Effective: Pay for throughput, not cluster nodes
  • Azure Integration: Native connectivity with Azure services
  • Global Scale: Multi-region replication and geo-disaster recovery

Supported Kafka Versions

  • Kafka 1.0 and above
  • Kafka 2.0 and above (recommended)
  • Kafka 3.0 and above

🏗️ Architecture Comparison

Traditional Kafka Deployment

graph TB
    subgraph "Your Infrastructure"
        ZK[ZooKeeper Cluster]
        K1[Kafka Broker 1]
        K2[Kafka Broker 2]
        K3[Kafka Broker 3]

        ZK --> K1
        ZK --> K2
        ZK --> K3
    end

    Producers[Producers] --> K1
    Producers --> K2
    K1 --> Consumers[Consumers]
    K3 --> Consumers

    Note[You manage:<br/>- Infrastructure<br/>- Scaling<br/>- Patching<br/>- Monitoring<br/>- Backup]

Event Hubs with Kafka Protocol

graph TB
    subgraph "Fully Managed by Azure"
        EH[Event Hubs<br/>Kafka Endpoint]
    end

    Producers[Kafka Producers] -->|Kafka Protocol| EH
    EH -->|Kafka Protocol| Consumers[Kafka Consumers]

    Note[Azure manages:<br/>- Infrastructure<br/>- Auto-scaling<br/>- Patching<br/>- Monitoring<br/>- Backup]

🚀 Migration Guide

Step 1: Update Connection Configuration

Before (Kafka):

# Kafka configuration
bootstrap.servers=kafka-broker1:9092,kafka-broker2:9092
security.protocol=PLAINTEXT

After (Event Hubs):

# Event Hubs configuration
bootstrap.servers=mynamespace.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY";

Step 2: Test Your Application

# No code changes needed - just update configuration
java -jar your-kafka-app.jar --config eventhubs-config.properties

🔧 Configuration Examples

Java Producer

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class EventHubsKafkaProducer {
    public static void main(String[] args) {
        Properties props = new Properties();

        // Event Hubs connection
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "mynamespace.servicebus.windows.net:9093");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=\"$ConnectionString\" " +
                "password=\"Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY\";");

        // Serialization
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // Create producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Send messages
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);

            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    exception.printStackTrace();
                } else {
                    System.out.printf("Sent to partition %d with offset %d%n",
                        metadata.partition(), metadata.offset());
                }
            });
        }

        producer.close();
    }
}

Java Consumer

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class EventHubsKafkaConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();

        // Event Hubs connection
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "mynamespace.servicebus.windows.net:9093");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                "username=\"$ConnectionString\" " +
                "password=\"Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY\";");

        // Consumer configuration
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        // Create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic"));

        // Poll for messages
        try {
            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n",
                        record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Python Producer (kafka-python)

from kafka import KafkaProducer
import json

# Event Hubs configuration
producer = KafkaProducer(
    bootstrap_servers='mynamespace.servicebus.windows.net:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username='$ConnectionString',
    sasl_plain_password='Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send messages
for i in range(100):
    data = {'sensor_id': f'sensor-{i}', 'temperature': 20 + i}
    future = producer.send('test-topic', value=data)

    # Wait for send to complete
    try:
        record_metadata = future.get(timeout=10)
        print(f'Sent to partition {record_metadata.partition} at offset {record_metadata.offset}')
    except Exception as e:
        print(f'Error sending message: {e}')

producer.close()

Python Consumer (kafka-python)

from kafka import KafkaConsumer
import json

# Event Hubs configuration
consumer = KafkaConsumer(
    'test-topic',
    bootstrap_servers='mynamespace.servicebus.windows.net:9093',
    security_protocol='SASL_SSL',
    sasl_mechanism='PLAIN',
    sasl_plain_username='$ConnectionString',
    sasl_plain_password='Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY',
    auto_offset_reset='earliest',
    group_id='my-consumer-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# Process messages
for message in consumer:
    print(f'Partition: {message.partition}, Offset: {message.offset}')
    print(f'Key: {message.key}, Value: {message.value}')

🔄 Concept Mapping

Kafka → Event Hubs Terminology

Kafka Concept Event Hubs Equivalent Notes
Kafka Cluster Event Hubs Namespace Logical container
Topic Event Hub Data stream
Partition Partition Same concept
Consumer Group Consumer Group Same concept
Broker Managed by Azure No direct equivalent
Offset Offset Same concept
Producer Producer/Publisher Same concept
Consumer Consumer/Subscriber Same concept

Configuration Mapping

Kafka Setting Event Hubs Setting Value
bootstrap.servers bootstrap.servers <namespace>.servicebus.windows.net:9093
security.protocol security.protocol SASL_SSL
N/A sasl.mechanism PLAIN
N/A sasl.jaas.config Connection string

⚙️ Feature Comparison

Supported Features

Feature Kafka Event Hubs Notes
Producer API Full compatibility
Consumer API Full compatibility
Consumer Groups Full compatibility
Transactions Supported in Premium tier
Compression GZIP, Snappy, LZ4
Idempotent Producer Supported
Exactly Once Semantics With transactions

Differences and Limitations

Feature Kafka Event Hubs Workaround
Topic Creation Dynamic Pre-created Use Azure Portal/CLI
Partition Count Change Supported Fixed at creation Plan ahead
Log Compaction Use Azure Storage
Kafka Streams Use Stream Analytics
Kafka Connect Limited Use Azure Data Factory
Admin APIs Full Limited Use Azure APIs

🎯 Migration Scenarios

Scenario 1: Lift and Shift

Goal: Migrate existing Kafka applications with minimal changes

# Step 1: Create Event Hubs namespace with Kafka enabled
az eventhubs namespace create \
  --name kafka-migration-ns \
  --resource-group rg-migration \
  --location eastus \
  --sku Standard \
  --enable-kafka true

# Step 2: Create Event Hubs (topics)
az eventhubs eventhub create \
  --name orders-topic \
  --namespace-name kafka-migration-ns \
  --resource-group rg-migration \
  --partition-count 8 \
  --message-retention 7

# Step 3: Update application configuration
# Replace kafka-broker:9092 with kafka-migration-ns.servicebus.windows.net:9093
# Add SASL_SSL configuration

# Step 4: Test and deploy

Scenario 2: Hybrid Architecture

Run Kafka and Event Hubs in parallel during migration.

graph LR
    subgraph "Producers"
        P1[Producer 1]
        P2[Producer 2]
    end

    subgraph "Migration Phase"
        Kafka[Kafka Cluster]
        EventHubs[Event Hubs]
    end

    subgraph "Consumers"
        C1[Consumer 1<br/>Kafka]
        C2[Consumer 2<br/>Event Hubs]
    end

    P1 -->|Old Config| Kafka
    P2 -->|New Config| EventHubs

    Kafka --> C1
    EventHubs --> C2

Scenario 3: Kafka Connect to Azure Data Factory

Replace Kafka Connect with Azure Data Factory connectors.

{
  "name": "CopyFromSQLToEventHub",
  "type": "Copy",
  "inputs": [
    {
      "referenceName": "SqlServerDataset",
      "type": "DatasetReference"
    }
  ],
  "outputs": [
    {
      "referenceName": "EventHubDataset",
      "type": "DatasetReference"
    }
  ],
  "typeProperties": {
    "source": {
      "type": "SqlSource"
    },
    "sink": {
      "type": "EventHubSink"
    }
  }
}

🔒 Security Configuration

Using Connection Strings

# Standard connection string authentication
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="$ConnectionString" \
    password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY";

Using Azure Active Directory (OAuth)

# AAD authentication (Premium tier)
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.login.callback.handler.class=CustomAuthCallbackHandler

Network Security

# Enable private endpoint for Kafka traffic
az network private-endpoint create \
  --name eventhub-kafka-pe \
  --resource-group rg-migration \
  --vnet-name my-vnet \
  --subnet default \
  --private-connection-resource-id $(az eventhubs namespace show \
    --name kafka-migration-ns \
    --resource-group rg-migration \
    --query id -o tsv) \
  --group-id namespace \
  --connection-name kafka-pe-connection

📊 Performance Optimization

Producer Optimization

// Optimize for throughput
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.ACKS_CONFIG, "1");  // Balance between durability and latency

Consumer Optimization

// Optimize for throughput
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

Partition Strategy

# Use partition key for related events
def partition_key_strategy(event):
    """Route related events to same partition."""
    # All events from same user go to same partition
    return event['user_id']

# Send with partition key
producer.send(
    'user-events',
    value=event_data,
    key=partition_key_strategy(event_data)
)

🧪 Testing Compatibility

Test Script (Python)

import os
from kafka import KafkaProducer, KafkaConsumer
import json
import time

def test_eventhubs_kafka_compatibility():
    """Test Event Hubs Kafka compatibility."""
    connection_string = os.getenv('EVENTHUB_CONNECTION_STRING')
    namespace = connection_string.split(';')[0].split('//')[1]

    # Configuration
    config = {
        'bootstrap_servers': f'{namespace}:9093',
        'security_protocol': 'SASL_SSL',
        'sasl_mechanism': 'PLAIN',
        'sasl_plain_username': '$ConnectionString',
        'sasl_plain_password': connection_string
    }

    # Test Producer
    print("Testing producer...")
    producer = KafkaProducer(
        **config,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    test_message = {
        'test_id': 'kafka-compatibility-test',
        'timestamp': time.time()
    }

    future = producer.send('test-topic', value=test_message)
    result = future.get(timeout=10)
    print(f"✅ Producer test passed: partition {result.partition}, offset {result.offset}")
    producer.close()

    # Test Consumer
    print("\nTesting consumer...")
    consumer = KafkaConsumer(
        'test-topic',
        **config,
        auto_offset_reset='earliest',
        group_id='test-group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    messages_received = 0
    for message in consumer:
        print(f"✅ Consumer test passed: received message {message.value}")
        messages_received += 1
        if messages_received >= 1:
            break

    consumer.close()
    print("\n✅ All compatibility tests passed!")

if __name__ == "__main__":
    test_eventhubs_kafka_compatibility()

Documentation

Best Practices

  • Migration Strategies
  • Performance Tuning
  • Security Hardening

External Resources


Last Updated: 2025-01-28 Kafka Protocol Support: 1.0+ Documentation Status: Complete