📊 Monitoring Setup Guide¶
📋 Overview¶
This guide provides comprehensive instructions for setting up monitoring and observability for the Azure Real-Time Analytics platform, ensuring proactive detection of issues and optimal system performance.
📑 Table of Contents¶
- Monitoring Architecture
- Azure Monitor Configuration
- Application Insights Setup
- Custom Metrics
- Alerting Rules
- Dashboards
- Log Analytics
🏗️ Monitoring Architecture¶
Monitoring Stack Overview¶
graph TB
subgraph "Data Sources"
A[Databricks Metrics]
B[Storage Metrics]
C[Kafka Metrics]
D[Application Logs]
E[Custom Metrics]
end
subgraph "Collection Layer"
F[Azure Monitor]
G[Application Insights]
H[Log Analytics]
end
subgraph "Processing"
I[Alert Rules]
J[Metric Aggregation]
K[Log Queries]
end
subgraph "Visualization"
L[Azure Dashboards]
M[Power BI Reports]
N[Grafana]
end
subgraph "Actions"
O[Email Alerts]
P[Teams Notifications]
Q[PagerDuty]
R[Auto-scaling]
end
A --> F
B --> F
C --> F
D --> G
E --> G
F --> I
G --> J
H --> K
I --> O
I --> P
I --> Q
I --> R
J --> L
J --> M
K --> N Key Metrics Categories¶
| Category | Metrics | Source | Frequency |
|---|---|---|---|
| Infrastructure | CPU, Memory, Disk, Network | Azure Monitor | 1 minute |
| Application | Response time, Error rate, Throughput | App Insights | 30 seconds |
| Business | Events processed, Data quality, SLA | Custom metrics | 5 minutes |
| Security | Failed auth, Access violations, Threats | Sentinel | Real-time |
| Cost | Resource consumption, Budget usage | Cost Management | Hourly |
🔧 Azure Monitor Configuration¶
1. Enable Diagnostic Settings¶
# Enable diagnostics for Databricks
az monitor diagnostic-settings create \
--name "databricks-diagnostics" \
--resource $DATABRICKS_RESOURCE_ID \
--workspace $LOG_ANALYTICS_WORKSPACE_ID \
--logs '[
{
"category": "clusters",
"enabled": true,
"retentionPolicy": {
"enabled": true,
"days": 30
}
},
{
"category": "jobs",
"enabled": true,
"retentionPolicy": {
"enabled": true,
"days": 30
}
}
]' \
--metrics '[
{
"category": "AllMetrics",
"enabled": true,
"retentionPolicy": {
"enabled": true,
"days": 30
}
}
]'
# Enable diagnostics for Storage
az monitor diagnostic-settings create \
--name "storage-diagnostics" \
--resource $STORAGE_RESOURCE_ID \
--workspace $LOG_ANALYTICS_WORKSPACE_ID \
--logs @storage-diagnostic-settings.json \
--metrics @storage-metrics-settings.json
2. Configure Metrics Collection¶
# metrics_collector.py
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import metrics
from opentelemetry.metrics import Observation
import time
# Configure Azure Monitor
configure_azure_monitor(
connection_string="InstrumentationKey=your-key;IngestionEndpoint=https://your-endpoint/"
)
# Get meter
meter = metrics.get_meter("realtime-analytics")
# Create custom metrics
events_counter = meter.create_counter(
name="events_processed",
description="Total events processed",
unit="events"
)
latency_histogram = meter.create_histogram(
name="processing_latency",
description="Event processing latency",
unit="ms"
)
error_counter = meter.create_counter(
name="processing_errors",
description="Total processing errors",
unit="errors"
)
# Collect metrics
def collect_metrics():
"""Collect and send custom metrics."""
while True:
# Simulate metric collection
events_counter.add(1000, {"source": "kafka", "environment": "prod"})
latency_histogram.record(250, {"pipeline": "streaming", "stage": "silver"})
# Send every 60 seconds
time.sleep(60)
3. Resource Monitoring Configuration¶
# monitoring-config.yaml
resources:
databricks:
metrics:
- name: cluster.cpu.percentage
aggregation: Average
threshold: 80
window: 5m
- name: cluster.memory.percentage
aggregation: Average
threshold: 85
window: 5m
- name: job.duration
aggregation: Maximum
threshold: 3600
window: 10m
storage:
metrics:
- name: BlobCapacity
aggregation: Average
threshold: 5000000000000 # 5TB
window: 1h
- name: Transactions
aggregation: Total
threshold: 1000000
window: 5m
- name: Ingress
aggregation: Total
threshold: 100000000000 # 100GB
window: 1h
eventHubs:
metrics:
- name: IncomingMessages
aggregation: Total
threshold: 10000000
window: 5m
- name: OutgoingMessages
aggregation: Total
threshold: 10000000
window: 5m
- name: ThrottledRequests
aggregation: Total
threshold: 100
window: 5m
📱 Application Insights Setup¶
1. Initialize Application Insights¶
# app_insights_config.py
from applicationinsights import TelemetryClient
from applicationinsights.logging import LoggingHandler
import logging
class MonitoringClient:
def __init__(self, instrumentation_key: str):
self.tc = TelemetryClient(instrumentation_key)
self.setup_logging()
def setup_logging(self):
"""Configure logging to Application Insights."""
handler = LoggingHandler(self.tc.instrumentation_key)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def track_event(self, name: str, properties: dict = None, measurements: dict = None):
"""Track custom event."""
self.tc.track_event(name, properties, measurements)
def track_metric(self, name: str, value: float, properties: dict = None):
"""Track custom metric."""
self.tc.track_metric(name, value, properties=properties)
def track_exception(self, exception: Exception, properties: dict = None):
"""Track exception."""
self.tc.track_exception(
type(exception).__name__,
str(exception),
exception.__traceback__,
properties=properties
)
def flush(self):
"""Flush telemetry."""
self.tc.flush()
2. Databricks Integration¶
# databricks_monitoring.py
from pyspark.sql import SparkSession
from typing import Dict
import json
class DatabricksMonitor:
def __init__(self, spark: SparkSession):
self.spark = spark
self.app_insights = MonitoringClient(instrumentation_key)
def monitor_streaming_query(self, query):
"""Monitor structured streaming query."""
def process_metrics(batch_df, batch_id):
metrics = {
"batch_id": batch_id,
"input_rows": batch_df.count(),
"processing_time": query.lastProgress["durationMs"]["triggerExecution"],
"input_rate": query.lastProgress["inputRowsPerSecond"],
"process_rate": query.lastProgress["processedRowsPerSecond"]
}
# Send to Application Insights
self.app_insights.track_event(
"streaming_batch_processed",
properties={"query_name": query.name},
measurements=metrics
)
# Check for issues
if metrics["processing_time"] > 10000: # >10 seconds
self.app_insights.track_event(
"slow_batch_detected",
properties={
"query_name": query.name,
"batch_id": str(batch_id)
},
measurements={"duration_ms": metrics["processing_time"]}
)
return process_metrics
def monitor_job_execution(self, job_id: str, job_name: str):
"""Monitor Databricks job execution."""
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run = w.jobs.get_run(run_id=job_id)
metrics = {
"job_id": job_id,
"job_name": job_name,
"state": run.state.life_cycle_state,
"start_time": run.start_time,
"end_time": run.end_time,
"duration_ms": (run.end_time - run.start_time) * 1000 if run.end_time else None,
"cluster_id": run.cluster_instance.cluster_id
}
self.app_insights.track_event(
"databricks_job_completed",
properties=metrics
)
if run.state.state_message:
self.app_insights.track_event(
"databricks_job_error",
properties={
"job_id": job_id,
"error": run.state.state_message
}
)
📈 Custom Metrics¶
Business Metrics Collection¶
# business_metrics.py
from datetime import datetime, timedelta
import asyncio
class BusinessMetricsCollector:
def __init__(self, spark, monitoring_client):
self.spark = spark
self.monitoring = monitoring_client
async def collect_data_quality_metrics(self):
"""Collect data quality metrics."""
quality_metrics = self.spark.sql("""
SELECT
COUNT(*) as total_records,
SUM(CASE WHEN is_valid = true THEN 1 ELSE 0 END) as valid_records,
SUM(CASE WHEN is_duplicate = true THEN 1 ELSE 0 END) as duplicate_records,
SUM(CASE WHEN is_late = true THEN 1 ELSE 0 END) as late_records
FROM silver.validated_events
WHERE event_time >= current_timestamp() - INTERVAL 5 MINUTES
""").collect()[0]
quality_score = (quality_metrics['valid_records'] /
quality_metrics['total_records'] * 100)
self.monitoring.track_metric(
"data_quality_score",
quality_score,
properties={"layer": "silver"}
)
self.monitoring.track_metric(
"duplicate_rate",
quality_metrics['duplicate_records'] / quality_metrics['total_records'] * 100,
properties={"layer": "silver"}
)
async def collect_sla_metrics(self):
"""Collect SLA compliance metrics."""
sla_metrics = self.spark.sql("""
SELECT
percentile_approx(processing_time_ms, 0.99) as p99_latency,
percentile_approx(processing_time_ms, 0.95) as p95_latency,
percentile_approx(processing_time_ms, 0.50) as p50_latency,
AVG(processing_time_ms) as avg_latency
FROM gold.processing_metrics
WHERE timestamp >= current_timestamp() - INTERVAL 1 HOUR
""").collect()[0]
# Check SLA compliance
sla_compliant = sla_metrics['p99_latency'] <= 5000 # 5 second SLA
self.monitoring.track_event(
"sla_compliance_check",
properties={"compliant": str(sla_compliant)},
measurements={
"p99_latency": sla_metrics['p99_latency'],
"p95_latency": sla_metrics['p95_latency'],
"p50_latency": sla_metrics['p50_latency']
}
)
async def run_collection_loop(self):
"""Run metrics collection loop."""
while True:
try:
await asyncio.gather(
self.collect_data_quality_metrics(),
self.collect_sla_metrics()
)
except Exception as e:
self.monitoring.track_exception(e)
await asyncio.sleep(300) # Collect every 5 minutes
🚨 Alerting Rules¶
Alert Configuration¶
{
"alerts": [
{
"name": "High Error Rate",
"description": "Error rate exceeds 1%",
"severity": "Critical",
"query": "customMetrics | where name == 'error_rate' | where value > 1",
"frequency": "PT5M",
"window": "PT5M",
"actions": ["email", "teams", "pagerduty"]
},
{
"name": "Processing Latency SLA",
"description": "P99 latency exceeds 5 seconds",
"severity": "High",
"query": "customMetrics | where name == 'p99_latency' | where value > 5000",
"frequency": "PT5M",
"window": "PT10M",
"actions": ["email", "teams"]
},
{
"name": "Storage Capacity",
"description": "Storage usage exceeds 80%",
"severity": "Warning",
"query": "AzureMetrics | where MetricName == 'BlobCapacity' | where Average > 4000000000000",
"frequency": "PT1H",
"window": "PT1H",
"actions": ["email"]
},
{
"name": "Cluster Auto-scale",
"description": "Cluster CPU exceeds 85% for 10 minutes",
"severity": "Information",
"query": "customMetrics | where name == 'cluster_cpu' | where avg(value) > 85",
"frequency": "PT5M",
"window": "PT10M",
"actions": ["autoscale"]
}
]
}
Alert Action Groups¶
# alert_actions.py
from azure.monitor.query import LogsQueryClient
from azure.communication.email import EmailClient
import requests
class AlertActionHandler:
def __init__(self):
self.email_client = EmailClient(connection_string)
self.teams_webhook = "https://outlook.office.com/webhook/..."
self.pagerduty_key = "your-integration-key"
def send_email_alert(self, alert: dict):
"""Send email alert."""
message = {
"content": {
"subject": f"Alert: {alert['name']}",
"plainText": f"Alert triggered: {alert['description']}\nSeverity: {alert['severity']}",
"html": self.format_alert_html(alert)
},
"recipients": {
"to": [{"address": "ops-team@company.com"}]
},
"senderAddress": "alerts@company.com"
}
self.email_client.send(message)
def send_teams_notification(self, alert: dict):
"""Send Teams notification."""
card = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": alert['name'],
"themeColor": self.get_severity_color(alert['severity']),
"sections": [{
"activityTitle": alert['name'],
"activitySubtitle": alert['description'],
"facts": [
{"name": "Severity", "value": alert['severity']},
{"name": "Time", "value": alert['timestamp']},
{"name": "Resource", "value": alert['resource']}
]
}]
}
requests.post(self.teams_webhook, json=card)
def trigger_pagerduty(self, alert: dict):
"""Trigger PagerDuty incident."""
if alert['severity'] in ['Critical', 'High']:
payload = {
"routing_key": self.pagerduty_key,
"event_action": "trigger",
"payload": {
"summary": alert['name'],
"severity": alert['severity'].lower(),
"source": "Azure Monitor",
"custom_details": alert
}
}
requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
📊 Dashboards¶
Executive Dashboard Configuration¶
{
"dashboard": {
"name": "Real-Time Analytics Executive Dashboard",
"tiles": [
{
"title": "System Health",
"type": "scorecard",
"query": "customMetrics | where name == 'system_health_score' | summarize avg(value)",
"visualization": "gauge",
"thresholds": [95, 98, 100]
},
{
"title": "Events Per Second",
"type": "metric",
"query": "customMetrics | where name == 'events_per_second' | summarize avg(value) by bin(timestamp, 1m)",
"visualization": "line"
},
{
"title": "Processing Latency",
"type": "metric",
"query": "customMetrics | where name in ('p50_latency', 'p95_latency', 'p99_latency') | summarize avg(value) by name, bin(timestamp, 5m)",
"visualization": "multi-line"
},
{
"title": "Error Rate",
"type": "metric",
"query": "customMetrics | where name == 'error_rate' | summarize avg(value) by bin(timestamp, 5m)",
"visualization": "area",
"alert": true
},
{
"title": "Cost Trend",
"type": "cost",
"query": "costs | summarize sum(PreTaxCost) by bin(UsageDate, 1d)",
"visualization": "bar"
}
]
}
}
📝 Log Analytics¶
KQL Queries for Analysis¶
// Top errors in last hour
exceptions
| where timestamp > ago(1h)
| summarize count() by type, message
| order by count_ desc
| take 10
// Slow queries
customMetrics
| where name == "query_duration"
| where value > 10000 // >10 seconds
| project timestamp, query_id = tostring(customDimensions.query_id), duration = value
| order by duration desc
| take 20
// Data quality trends
customMetrics
| where name == "data_quality_score"
| summarize avg(value) by bin(timestamp, 1h), layer = tostring(customDimensions.layer)
| render timechart
// Resource utilization
AzureMetrics
| where TimeGenerated > ago(24h)
| where MetricName in ("cpu_percent", "memory_percent", "disk_percent")
| summarize avg(Average) by bin(TimeGenerated, 5m), Resource, MetricName
| render timechart
📚 Related Documentation¶
Last Updated: January 29, 2025
Version: 1.0.0
Maintainer: Platform Monitoring Team