Event Hubs Integration with Databricks¶
Home | Implementation | Integration | EventHub + Databricks
Stream processing from Azure Event Hubs using Databricks Structured Streaming.
Overview¶
This pattern enables real-time data processing from Event Hubs using Spark Structured Streaming, ideal for:
- Real-time ML feature engineering
- IoT data processing
- Clickstream analytics
- Log aggregation
Implementation¶
Step 1: Configure Event Hubs Connection¶
# Connection configuration
eventhub_config = {
"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
"Endpoint=sb://myhub.servicebus.windows.net/;SharedAccessKeyName=listen;SharedAccessKey=xxx;EntityPath=events"
),
"eventhubs.consumerGroup": "$Default",
"eventhubs.startingPosition": json.dumps({"offset": "-1", "seqNo": -1, "enqueuedTime": None, "isInclusive": True})
}
Step 2: Create Streaming Reader¶
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Define schema for event body
event_schema = StructType([
StructField("device_id", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("temperature", DoubleType(), True),
StructField("humidity", DoubleType(), True),
StructField("location", StringType(), True)
])
# Read from Event Hubs
stream_df = spark.readStream \
.format("eventhubs") \
.options(**eventhub_config) \
.load()
# Parse event body
parsed_df = stream_df \
.withColumn("body", col("body").cast("string")) \
.withColumn("event", from_json(col("body"), event_schema)) \
.select(
col("event.*"),
col("enqueuedTime").alias("event_time"),
col("offset"),
col("sequenceNumber")
)
Step 3: Stream Processing¶
from pyspark.sql.functions import window
# Windowed aggregations
windowed_stats = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window(col("event_time"), "5 minutes", "1 minute"),
col("device_id")
) \
.agg(
avg("temperature").alias("avg_temp"),
max("temperature").alias("max_temp"),
min("temperature").alias("min_temp"),
count("*").alias("reading_count")
)
# Anomaly detection
anomalies = parsed_df \
.filter(
(col("temperature") > 100) |
(col("temperature") < -40) |
(col("humidity") > 100)
) \
.withColumn("anomaly_type",
when(col("temperature") > 100, "HIGH_TEMP")
.when(col("temperature") < -40, "LOW_TEMP")
.otherwise("INVALID_HUMIDITY")
)
Step 4: Write to Delta Lake¶
# Write aggregates to Delta
agg_query = windowed_stats.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/iot_aggregates") \
.trigger(processingTime="1 minute") \
.toTable("silver.iot.device_stats")
# Write anomalies with alert trigger
def process_anomalies(batch_df, batch_id):
if batch_df.count() > 0:
# Write to Delta
batch_df.write.format("delta").mode("append").saveAsTable("silver.iot.anomalies")
# Send alerts
alerts = batch_df.select("device_id", "anomaly_type", "event_time").collect()
for alert in alerts:
send_alert(alert.device_id, alert.anomaly_type)
anomaly_query = anomalies.writeStream \
.foreachBatch(process_anomalies) \
.option("checkpointLocation", "/checkpoints/anomalies") \
.trigger(processingTime="30 seconds") \
.start()
Step 5: Feature Engineering for ML¶
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
def compute_features(batch_df, batch_id):
"""Compute real-time features for ML models."""
features = batch_df.groupBy("device_id").agg(
avg("temperature").alias("temp_avg_5min"),
stddev("temperature").alias("temp_stddev_5min"),
(max("temperature") - min("temperature")).alias("temp_range_5min"),
count("*").alias("reading_count_5min")
).withColumn("feature_timestamp", current_timestamp())
# Update feature table
fe.write_table(
name="ml.features.device_realtime",
df=features,
mode="merge"
)
# Stream features
feature_query = parsed_df \
.withWatermark("event_time", "5 minutes") \
.writeStream \
.foreachBatch(compute_features) \
.option("checkpointLocation", "/checkpoints/features") \
.trigger(processingTime="5 minutes") \
.start()
Configuration Options¶
Consumer Settings¶
advanced_config = {
"eventhubs.connectionString": encrypted_conn_string,
"eventhubs.consumerGroup": "databricks-consumer",
"maxEventsPerTrigger": 10000, # Batch size
"eventhubs.receiverTimeout": "PT60S", # 60 second timeout
"eventhubs.operationTimeout": "PT120S"
}
Checkpoint Management¶
# Restart from specific position
start_position = {
"offset": "1000",
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}
# Or from timestamp
start_position = {
"offset": None,
"seqNo": -1,
"enqueuedTime": "2024-01-01T00:00:00.000Z",
"isInclusive": True
}
Monitoring¶
# Monitor streaming queries
for query in spark.streams.active:
print(f"Query: {query.name}")
print(f" Status: {query.status}")
print(f" Recent Progress: {query.recentProgress}")
Related Documentation¶
Last Updated: January 2025