Stream Analytics Integration with Azure Functions¶
Home | Implementation | Integration | ASA + Functions
Extend Stream Analytics with custom processing using Azure Functions UDFs and outputs.
Overview¶
Stream Analytics + Functions enables:
- Custom aggregation and transformation logic
- Integration with external APIs and services
- Complex event processing beyond SQL capabilities
- Machine learning model inference
Implementation¶
Step 1: JavaScript UDF in Stream Analytics¶
// UDF for custom scoring
function scoreAnomaly(temperature, humidity, pressure) {
// Weighted anomaly score
var tempScore = Math.abs(temperature - 25) / 50;
var humidScore = Math.abs(humidity - 50) / 50;
var pressScore = Math.abs(pressure - 1013) / 100;
return (tempScore * 0.4) + (humidScore * 0.3) + (pressScore * 0.3);
}
Step 2: Azure Function as UDF¶
# function_app.py
import azure.functions as func
import json
app = func.FunctionApp()
@app.function_name("EnrichWithGeolocation")
@app.route(route="enrich", methods=["POST"])
async def enrich_with_geolocation(req: func.HttpRequest) -> func.HttpResponse:
"""UDF to enrich events with geolocation from IP."""
try:
records = req.get_json()
enriched = []
for record in records:
ip = record.get("ip_address")
if ip:
# Call geolocation API
geo_data = await lookup_geolocation(ip)
record["country"] = geo_data.get("country")
record["city"] = geo_data.get("city")
record["latitude"] = geo_data.get("lat")
record["longitude"] = geo_data.get("lon")
enriched.append(record)
return func.HttpResponse(
json.dumps(enriched),
mimetype="application/json"
)
except Exception as e:
return func.HttpResponse(str(e), status_code=500)
Step 3: Stream Analytics Query with UDF¶
-- Register the function
CREATE FUNCTION EnrichLocation AS
RETURNS RECORD(
country NVARCHAR(100),
city NVARCHAR(100),
latitude FLOAT,
longitude FLOAT
)
WITH (
FUNCTION_TYPE = 'AzureMLService',
ENDPOINT = 'https://myfunc.azurewebsites.net/api/enrich'
)
-- Use the UDF
SELECT
device_id,
temperature,
ip_address,
EnrichLocation(ip_address).country AS country,
EnrichLocation(ip_address).city AS city,
EnrichLocation(ip_address).latitude AS lat,
EnrichLocation(ip_address).longitude AS lon,
System.Timestamp() AS event_time
INTO [enriched-output]
FROM [iot-input]
Step 4: Function Output for Custom Destinations¶
@app.function_name("CustomOutput")
@app.route(route="output", methods=["POST"])
async def custom_output(req: func.HttpRequest) -> func.HttpResponse:
"""Custom output adapter for Stream Analytics."""
records = req.get_json()
# Write to multiple destinations
tasks = [
write_to_redis(records),
write_to_elasticsearch(records),
send_to_kafka(records)
]
await asyncio.gather(*tasks)
return func.HttpResponse("OK", status_code=200)
async def write_to_redis(records):
"""Cache latest values in Redis."""
import aioredis
redis = await aioredis.from_url(os.environ["REDIS_URL"])
for record in records:
key = f"device:{record['device_id']}:latest"
await redis.hset(key, mapping={
"temperature": record["temperature"],
"timestamp": record["event_time"]
})
await redis.expire(key, 3600) # 1 hour TTL
async def write_to_elasticsearch(records):
"""Index for search and analytics."""
from elasticsearch import AsyncElasticsearch
es = AsyncElasticsearch([os.environ["ES_URL"]])
actions = [
{
"_index": "iot-events",
"_source": record
}
for record in records
]
await helpers.async_bulk(es, actions)
async def send_to_kafka(records):
"""Forward to Kafka for additional processing."""
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(bootstrap_servers=os.environ["KAFKA_BROKERS"])
await producer.start()
for record in records:
await producer.send_and_wait(
"iot-events",
json.dumps(record).encode()
)
await producer.stop()
Step 5: ML Model Integration¶
@app.function_name("AnomalyDetection")
@app.route(route="anomaly", methods=["POST"])
async def detect_anomalies(req: func.HttpRequest) -> func.HttpResponse:
"""ML model inference for anomaly detection."""
import joblib
import numpy as np
# Load model (cached in memory)
model = load_model()
records = req.get_json()
results = []
for record in records:
features = np.array([[
record["temperature"],
record["humidity"],
record["pressure"]
]])
# Predict
is_anomaly = model.predict(features)[0]
anomaly_score = model.decision_function(features)[0]
results.append({
**record,
"is_anomaly": bool(is_anomaly == -1),
"anomaly_score": float(anomaly_score)
})
return func.HttpResponse(
json.dumps(results),
mimetype="application/json"
)
# Cache model in memory
_model = None
def load_model():
global _model
if _model is None:
from azure.storage.blob import BlobClient
blob = BlobClient.from_connection_string(
os.environ["STORAGE_CONNECTION"],
"models",
"anomaly_detector.pkl"
)
model_bytes = blob.download_blob().readall()
_model = joblib.loads(model_bytes)
return _model
Step 6: Stream Analytics Query with ML¶
-- Use ML function
WITH AnomalyScored AS (
SELECT
device_id,
temperature,
humidity,
pressure,
UDF.AnomalyDetection(temperature, humidity, pressure) AS anomaly
FROM [iot-input]
)
SELECT
device_id,
temperature,
humidity,
pressure,
anomaly.is_anomaly,
anomaly.anomaly_score,
System.Timestamp() AS event_time
INTO [ml-output]
FROM AnomalyScored
WHERE anomaly.is_anomaly = true
Performance Optimization¶
# Batch processing for efficiency
@app.function_name("BatchProcessor")
@app.route(route="batch", methods=["POST"])
async def batch_processor(req: func.HttpRequest) -> func.HttpResponse:
"""Process records in batches for efficiency."""
records = req.get_json()
# Process in chunks
chunk_size = 100
results = []
for i in range(0, len(records), chunk_size):
chunk = records[i:i + chunk_size]
chunk_results = await process_chunk(chunk)
results.extend(chunk_results)
return func.HttpResponse(json.dumps(results), mimetype="application/json")
Related Documentation¶
Last Updated: January 2025