⚡ Tutorial 9: Azure Functions Integration¶
Extend Stream Analytics with Azure Functions for custom processing, external API calls, and automated responses. Trigger serverless functions from streaming events.
🎯 Learning Objectives¶
- ✅ Create Azure Functions triggered by Stream Analytics
- ✅ Send alerts via email/SMS using custom logic
- ✅ Call external APIs for data enrichment
- ✅ Implement complex business logic beyond SAQL
- ✅ Handle errors and retries in functions
⏱️ Time Estimate: 35 minutes¶
📋 Prerequisites¶
- Completed Tutorial 08: Power BI Integration
- Azure Functions Core Tools installed
- Python 3.8+ or Node.js 18+
🔧 Step 1: Create Azure Function App¶
1.1 Create Function App¶
# Create Function App
$functionAppName = "streamfunc$(Get-Random -Minimum 1000 -Maximum 9999)"
$storageAccountFunc = "funcsa$(Get-Random -Minimum 1000 -Maximum 9999)"
# Create storage for function app
az storage account create `
--name $storageAccountFunc `
--resource-group $env:STREAM_RG `
--location $env:STREAM_LOCATION `
--sku Standard_LRS
# Create Function App (Python runtime)
az functionapp create `
--name $functionAppName `
--resource-group $env:STREAM_RG `
--storage-account $storageAccountFunc `
--consumption-plan-location $env:STREAM_LOCATION `
--runtime python `
--runtime-version 3.9 `
--functions-version 4 `
--os-type Linux
# Save function app name
[Environment]::SetEnvironmentVariable("STREAM_FUNCTION_APP", $functionAppName, "User")
Write-Host "Function App created: $functionAppName"
📨 Step 2: Create Alert Function¶
2.1 Initialize Function Project¶
# Create local function project
mkdir StreamAlertFunctions
cd StreamAlertFunctions
# Initialize Python function app
func init . --python
# Create HTTP-triggered function for Stream Analytics
func new --name ProcessAlert --template "HTTP trigger" --authlevel function
2.2 Implement Alert Logic¶
Edit ProcessAlert/__init__.py:
import logging
import json
import os
from datetime import datetime
import azure.functions as func
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
def main(req: func.HttpRequest) -> func.HttpResponse:
"""
Process alerts from Stream Analytics and send notifications.
"""
logging.info('Processing alert from Stream Analytics')
try:
# Parse request body
req_body = req.get_json()
# Extract alert details
device_id = req_body.get('deviceId')
location = req_body.get('location')
temperature = req_body.get('temperature')
anomaly_score = req_body.get('anomalyScore', 0)
alert_level = req_body.get('alertLevel', 'Unknown')
timestamp = req_body.get('timestamp')
logging.info(f"Alert received: Device {device_id}, Level: {alert_level}")
# Determine if notification should be sent
if alert_level in ['Critical', 'Warning']:
# Send email notification
send_email_alert(
device_id=device_id,
location=location,
temperature=temperature,
anomaly_score=anomaly_score,
alert_level=alert_level,
timestamp=timestamp
)
# Log to Application Insights
logging.warning(
f"ALERT: {alert_level} - Device {device_id} at {location} - "
f"Temp: {temperature}°F, Score: {anomaly_score}"
)
# Could also:
# - Send SMS via Twilio
# - Create ServiceNow ticket
# - Post to Teams/Slack
# - Trigger automation workflow
return func.HttpResponse(
json.dumps({
"status": "success",
"message": f"Alert processed for device {device_id}",
"notificationSent": alert_level in ['Critical', 'Warning']
}),
status_code=200,
mimetype="application/json"
)
except ValueError as ve:
logging.error(f"Invalid request body: {str(ve)}")
return func.HttpResponse(
json.dumps({"status": "error", "message": "Invalid request format"}),
status_code=400,
mimetype="application/json"
)
except Exception as e:
logging.error(f"Error processing alert: {str(e)}")
return func.HttpResponse(
json.dumps({"status": "error", "message": str(e)}),
status_code=500,
mimetype="application/json"
)
def send_email_alert(device_id, location, temperature, anomaly_score, alert_level, timestamp):
"""Send email alert using SendGrid."""
sendgrid_api_key = os.environ.get('SENDGRID_API_KEY')
if not sendgrid_api_key:
logging.warning("SendGrid API key not configured, skipping email")
return
try:
message = Mail(
from_email='alerts@your-domain.com',
to_emails='operations@your-domain.com',
subject=f'[{alert_level}] Sensor Alert: {device_id}',
html_content=f"""
<html>
<body>
<h2 style="color: {'red' if alert_level == 'Critical' else 'orange'}">
{alert_level} Alert Detected
</h2>
<p><strong>Device ID:</strong> {device_id}</p>
<p><strong>Location:</strong> {location}</p>
<p><strong>Temperature:</strong> {temperature}°F</p>
<p><strong>Anomaly Score:</strong> {anomaly_score:.2f}</p>
<p><strong>Timestamp:</strong> {timestamp}</p>
<p><strong>Severity:</strong> {alert_level}</p>
<hr>
<p><em>This is an automated alert from Stream Analytics monitoring system.</em></p>
</body>
</html>
"""
)
sg = SendGridAPIClient(sendgrid_api_key)
response = sg.send(message)
logging.info(f"Email sent successfully: {response.status_code}")
except Exception as e:
logging.error(f"Failed to send email: {str(e)}")
2.3 Add Dependencies¶
Edit requirements.txt:
2.4 Deploy Function¶
# Deploy to Azure
func azure functionapp publish $env:STREAM_FUNCTION_APP
# Configure app settings
az functionapp config appsettings set `
--name $env:STREAM_FUNCTION_APP `
--resource-group $env:STREAM_RG `
--settings "SENDGRID_API_KEY=your-sendgrid-api-key"
# Get function URL
$functionUrl = az functionapp function show `
--name $env:STREAM_FUNCTION_APP `
--resource-group $env:STREAM_RG `
--function-name ProcessAlert `
--query "invokeUrlTemplate" `
--output tsv
Write-Host "Function URL: $functionUrl"
🔗 Step 3: Configure Function Output in Stream Analytics¶
3.1 Create Function Output¶
# Create Azure Function output configuration
$functionOutputConfig = @{
properties = @{
datasource = @{
type = "Microsoft.AzureFunction"
properties = @{
functionAppName = $env:STREAM_FUNCTION_APP
functionName = "ProcessAlert"
apiKey = "YOUR_FUNCTION_KEY"
}
}
}
} | ConvertTo-Json -Depth 10 | Out-File -FilePath "function-output.json" -Encoding UTF8
# Add output to Stream Analytics job
az stream-analytics output create `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "FunctionOutput" `
--properties @function-output.json
3.2 Update Query to Call Function¶
-- Send critical anomalies to Azure Function
WITH AnomalyDetection AS (
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
status,
CAST(AnomalyDetection_SpikeAndDip(temperature, 95, 120, 'spikesanddips')
OVER(PARTITION BY deviceId LIMIT DURATION(minute, 15)) AS RECORD) AS tempAnomaly
FROM
EventHubInput TIMESTAMP BY timestamp
)
SELECT
deviceId,
location,
timestamp,
temperature,
vibration,
status,
tempAnomaly.Score AS anomalyScore,
CASE
WHEN tempAnomaly.Score > 0.8 THEN 'Critical'
WHEN tempAnomaly.Score > 0.5 THEN 'Warning'
ELSE 'Info'
END AS alertLevel
INTO
FunctionOutput
FROM
AnomalyDetection
WHERE
tempAnomaly.IsAnomaly = 1
AND tempAnomaly.Score > 0.5; -- Only send significant anomalies
🌐 Step 4: Create External API Integration Function¶
4.1 Weather API Enrichment Function¶
# File: EnrichWithWeather/__init__.py
import logging
import json
import os
import requests
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
"""
Enrich sensor data with external weather data.
"""
logging.info('Enriching data with weather information')
try:
req_body = req.get_json()
location = req_body.get('location')
temperature = req_body.get('temperature')
# Call external weather API
weather_api_key = os.environ.get('WEATHER_API_KEY')
weather_data = get_weather_data(location, weather_api_key)
# Calculate correlation
if weather_data:
indoor_temp = float(temperature)
outdoor_temp = weather_data.get('temperature', 0)
temp_differential = indoor_temp - outdoor_temp
enriched_data = {
**req_body,
'outdoorTemperature': outdoor_temp,
'temperatureDifferential': temp_differential,
'weatherCondition': weather_data.get('condition', 'Unknown'),
'humidity_outdoor': weather_data.get('humidity', 0)
}
else:
enriched_data = {**req_body, 'weatherDataAvailable': False}
return func.HttpResponse(
json.dumps(enriched_data),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f"Error enriching data: {str(e)}")
return func.HttpResponse(
json.dumps({"status": "error", "message": str(e)}),
status_code=500,
mimetype="application/json"
)
def get_weather_data(location: str, api_key: str) -> dict:
"""Fetch weather data from external API."""
if not api_key:
return None
try:
# Extract city from location string
city = location.split('/')[0] if '/' in location else location
# Call weather API (example using OpenWeatherMap)
url = f"https://api.openweathermap.org/data/2.5/weather"
params = {
'q': city,
'appid': api_key,
'units': 'imperial'
}
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
return {
'temperature': data['main']['temp'],
'humidity': data['main']['humidity'],
'condition': data['weather'][0]['description']
}
except Exception as e:
logging.error(f"Failed to fetch weather data: {str(e)}")
return None
🔄 Step 5: Create Automation Function¶
5.1 Device Control Function¶
# File: DeviceControl/__init__.py
import logging
import json
import azure.functions as func
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import CloudToDeviceMethod
def main(req: func.HttpRequest) -> func.HttpResponse:
"""
Send control commands to devices based on anomalies.
"""
logging.info('Processing device control request')
try:
req_body = req.get_json()
device_id = req_body.get('deviceId')
alert_level = req_body.get('alertLevel')
temperature = req_body.get('temperature')
# Determine action based on alert level
action = determine_action(alert_level, temperature)
if action:
# Send command to IoT device
result = send_device_command(device_id, action)
logging.info(f"Command sent to {device_id}: {action}")
return func.HttpResponse(
json.dumps({
"status": "success",
"device": device_id,
"action": action,
"result": result
}),
status_code=200,
mimetype="application/json"
)
else:
return func.HttpResponse(
json.dumps({
"status": "no_action",
"message": "No action required"
}),
status_code=200,
mimetype="application/json"
)
except Exception as e:
logging.error(f"Error in device control: {str(e)}")
return func.HttpResponse(
json.dumps({"status": "error", "message": str(e)}),
status_code=500,
mimetype="application/json"
)
def determine_action(alert_level: str, temperature: float) -> str:
"""Determine appropriate action based on alert."""
if alert_level == 'Critical' and temperature > 90:
return "EMERGENCY_SHUTDOWN"
elif alert_level == 'Critical':
return "REDUCE_LOAD"
elif alert_level == 'Warning':
return "INCREASE_COOLING"
return None
def send_device_command(device_id: str, command: str) -> dict:
"""Send command to IoT device via IoT Hub."""
connection_string = os.environ.get('IOT_HUB_CONNECTION_STRING')
if not connection_string:
logging.warning("IoT Hub connection string not configured")
return {"status": "not_configured"}
try:
registry_manager = IoTHubRegistryManager(connection_string)
# Create device method
method = CloudToDeviceMethod(
method_name="executeCommand",
payload={"command": command},
response_timeout_in_seconds=30,
connect_timeout_in_seconds=30
)
# Invoke method on device
result = registry_manager.invoke_device_method(device_id, method)
return {
"status": "success",
"response": result.payload
}
except Exception as e:
logging.error(f"Failed to send device command: {str(e)}")
return {"status": "error", "error": str(e)}
🧪 Step 6: Testing and Monitoring¶
6.1 Test Function Locally¶
# Start function locally
func start
# Test with curl
curl -X POST http://localhost:7071/api/ProcessAlert `
-H "Content-Type: application/json" `
-d '{
"deviceId": "sensor-001",
"location": "Building-A/Floor-1",
"temperature": 92.5,
"anomalyScore": 0.85,
"alertLevel": "Critical",
"timestamp": "2025-01-15T10:30:00Z"
}'
6.2 Monitor Function Executions¶
# View function logs
az functionapp log tail `
--name $env:STREAM_FUNCTION_APP `
--resource-group $env:STREAM_RG
# Check Application Insights
az monitor app-insights metrics show `
--app $env:STREAM_FUNCTION_APP `
--resource-group $env:STREAM_RG `
--metrics "requests/count"
🎓 Key Concepts Learned¶
Integration Patterns¶
- Alerting: Send notifications for critical events
- Enrichment: Add external data to streams
- Automation: Trigger actions based on analytics
- Orchestration: Coordinate complex workflows
Best Practices¶
- Use appropriate timeout settings (30-60 seconds)
- Implement retry logic for transient failures
- Log all function invocations
- Monitor function performance and costs
- Secure function keys in Key Vault
🚀 Next Steps¶
You've integrated serverless functions! Continue to:
Tutorial 10: Performance Tuning →
📚 Additional Resources¶
Tutorial Progress: 9 of 11 complete | Next: Performance Tuning
Last Updated: January 2025