🎲 Tutorial 2: Data Generators and Simulators¶
Create realistic IoT data generators to simulate sensor streams for testing Stream Analytics queries. Learn data generation patterns, velocity control, and anomaly injection techniques.
🎯 Learning Objectives¶
After completing this tutorial, you will be able to:
- ✅ Build IoT data simulators that generate realistic sensor data
- ✅ Control data velocity with configurable generation rates
- ✅ Inject anomalies for testing detection algorithms
- ✅ Send data to Event Hubs programmatically
- ✅ Monitor data flow and verify ingestion
⏱️ Time Estimate: 20 minutes¶
- Simulator Development: 10 minutes
- Configuration & Testing: 5 minutes
- Validation: 5 minutes
📋 Prerequisites¶
- Completed Tutorial 01: Environment Setup
- Event Hub namespace and hub created
- Python 3.8+ installed with azure-eventhub package
- Environment variables configured from Tutorial 01
🛠️ Step 1: Create Base Data Generator¶
1.1 Define Sensor Data Schema¶
Create a structured schema for IoT sensor data:
# sensor_schema.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
import json
@dataclass
class SensorReading:
"""Schema for IoT sensor readings"""
deviceId: str
deviceType: str
temperature: float
humidity: float
pressure: float
timestamp: str
location: Dict[str, Any]
def to_json(self) -> str:
"""Convert to JSON string for Event Hub"""
return json.dumps({
'deviceId': self.deviceId,
'deviceType': self.deviceType,
'temperature': round(self.temperature, 2),
'humidity': round(self.humidity, 2),
'pressure': round(self.pressure, 2),
'timestamp': self.timestamp,
'location': self.location
})
@staticmethod
def generate_normal_reading(device_id: str, device_type: str) -> 'SensorReading':
"""Generate a normal sensor reading"""
import random
from datetime import datetime, timezone
return SensorReading(
deviceId=device_id,
deviceType=device_type,
temperature=random.uniform(65.0, 85.0), # Normal range
humidity=random.uniform(30.0, 70.0), # Normal range
pressure=random.uniform(980.0, 1020.0), # Normal range
timestamp=datetime.now(timezone.utc).isoformat(),
location={
'building': f'Building-{random.randint(1, 5)}',
'floor': random.randint(1, 10),
'room': f'Room-{random.randint(100, 999)}'
}
)
1.2 Create Simple Data Generator¶
Build a basic data generator for continuous streaming:
# simple_generator.py
import os
import time
import asyncio
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError
from sensor_schema import SensorReading
from datetime import datetime
class SimpleDataGenerator:
"""Basic IoT data generator for Event Hubs"""
def __init__(self, connection_string: str, eventhub_name: str):
self.connection_string = connection_string
self.eventhub_name = eventhub_name
self.producer = None
self.is_running = False
def connect(self):
"""Initialize Event Hub producer connection"""
try:
self.producer = EventHubProducerClient.from_connection_string(
conn_str=self.connection_string,
eventhub_name=self.eventhub_name
)
print(f"✅ Connected to Event Hub: {self.eventhub_name}")
except EventHubError as e:
print(f"❌ Connection failed: {e}")
raise
def generate_batch(self, device_ids: list, batch_size: int = 10) -> list:
"""Generate a batch of sensor readings"""
readings = []
for device_id in device_ids:
reading = SensorReading.generate_normal_reading(
device_id=device_id,
device_type='TempHumiditySensor'
)
readings.append(reading)
return readings[:batch_size]
def send_batch(self, readings: list):
"""Send batch of readings to Event Hub"""
try:
event_data_batch = self.producer.create_batch()
for reading in readings:
event_data_batch.add(EventData(reading.to_json()))
self.producer.send_batch(event_data_batch)
return len(readings)
except EventHubError as e:
print(f"❌ Send failed: {e}")
return 0
def run(self, device_count: int = 5, events_per_second: int = 10, duration_seconds: int = 60):
"""Run data generator for specified duration"""
device_ids = [f"sensor-{str(i).zfill(3)}" for i in range(1, device_count + 1)]
print(f"\n🚀 Starting data generation...")
print(f" Devices: {device_count}")
print(f" Rate: {events_per_second} events/second")
print(f" Duration: {duration_seconds} seconds")
print(f" Total Events: {events_per_second * duration_seconds}\n")
self.connect()
self.is_running = True
total_sent = 0
start_time = time.time()
interval = 1.0 / events_per_second
try:
while self.is_running and (time.time() - start_time) < duration_seconds:
readings = self.generate_batch(device_ids, batch_size=1)
sent = self.send_batch(readings)
total_sent += sent
if total_sent % 100 == 0:
elapsed = time.time() - start_time
rate = total_sent / elapsed if elapsed > 0 else 0
print(f"📊 Sent: {total_sent} events | Rate: {rate:.1f} events/sec")
time.sleep(interval)
except KeyboardInterrupt:
print("\n⚠️ Generation stopped by user")
finally:
self.close()
elapsed = time.time() - start_time
print(f"\n✅ Generation complete!")
print(f" Total sent: {total_sent} events")
print(f" Elapsed time: {elapsed:.1f} seconds")
print(f" Average rate: {total_sent/elapsed:.1f} events/sec")
def close(self):
"""Close Event Hub connection"""
if self.producer:
self.producer.close()
print("🔌 Connection closed")
# Usage example
if __name__ == "__main__":
# Load connection string from environment
connection_string = os.environ.get("STREAM_EH_SEND_CONN")
eventhub_name = os.environ.get("STREAM_EH_NAME")
if not connection_string or not eventhub_name:
print("❌ Error: Environment variables not set")
print(" Please complete Tutorial 01 first")
exit(1)
generator = SimpleDataGenerator(connection_string, eventhub_name)
generator.run(device_count=5, events_per_second=10, duration_seconds=60)
Expected Output:
✅ Connected to Event Hub: sensordata
🚀 Starting data generation...
Devices: 5
Rate: 10 events/second
Duration: 60 seconds
Total Events: 600
📊 Sent: 100 events | Rate: 10.1 events/sec
📊 Sent: 200 events | Rate: 10.0 events/sec
📊 Sent: 300 events | Rate: 10.0 events/sec
📊 Sent: 400 events | Rate: 10.0 events/sec
📊 Sent: 500 events | Rate: 10.0 events/sec
📊 Sent: 600 events | Rate: 10.0 events/sec
✅ Generation complete!
Total sent: 600 events
Elapsed time: 60.2 seconds
Average rate: 10.0 events/sec
🔌 Connection closed
🎯 Step 2: Advanced Generator with Patterns¶
2.1 Create Pattern-Based Generator¶
Implement realistic data patterns (daily cycles, trends):
# advanced_generator.py
import math
import random
from datetime import datetime, timezone
from sensor_schema import SensorReading
class AdvancedDataGenerator:
"""Generator with realistic patterns and trends"""
def __init__(self):
self.base_temperature = 72.0
self.base_humidity = 50.0
self.base_pressure = 1013.25
self.time_offset = 0
def apply_daily_cycle(self, base_value: float, amplitude: float, hour: int) -> float:
"""Apply sinusoidal daily pattern (peak at 2pm, low at 2am)"""
# Hour 14 (2pm) = peak, hour 2 (2am) = trough
phase = (hour - 2) * (2 * math.pi / 24)
variation = amplitude * math.sin(phase)
return base_value + variation
def apply_trend(self, base_value: float, trend_rate: float, minutes_elapsed: int) -> float:
"""Apply gradual trend over time"""
hours_elapsed = minutes_elapsed / 60.0
return base_value + (trend_rate * hours_elapsed)
def apply_noise(self, value: float, noise_level: float = 0.5) -> float:
"""Add random noise to value"""
noise = random.uniform(-noise_level, noise_level)
return value + noise
def generate_realistic_reading(
self,
device_id: str,
device_type: str,
current_hour: int = None,
minutes_elapsed: int = 0
) -> SensorReading:
"""Generate reading with realistic patterns"""
if current_hour is None:
current_hour = datetime.now().hour
# Apply daily cycle (temperature higher in afternoon)
temp = self.apply_daily_cycle(self.base_temperature, amplitude=8.0, hour=current_hour)
temp = self.apply_trend(temp, trend_rate=0.1, minutes_elapsed=minutes_elapsed)
temp = self.apply_noise(temp, noise_level=1.0)
# Humidity inversely related to temperature
humidity = self.apply_daily_cycle(self.base_humidity, amplitude=-10.0, hour=current_hour)
humidity = self.apply_noise(humidity, noise_level=2.0)
# Pressure has longer-term variations
pressure = self.apply_trend(self.base_pressure, trend_rate=-0.2, minutes_elapsed=minutes_elapsed)
pressure = self.apply_noise(pressure, noise_level=5.0)
return SensorReading(
deviceId=device_id,
deviceType=device_type,
temperature=max(60.0, min(90.0, temp)), # Clamp to realistic range
humidity=max(20.0, min(80.0, humidity)),
pressure=max(950.0, min(1050.0, pressure)),
timestamp=datetime.now(timezone.utc).isoformat(),
location={
'building': f'Building-{random.randint(1, 5)}',
'floor': random.randint(1, 10),
'room': f'Room-{random.randint(100, 999)}'
}
)
2.2 Implement Anomaly Injection¶
Create controlled anomalies for testing detection:
# anomaly_injector.py
import random
from sensor_schema import SensorReading
from datetime import datetime, timezone
class AnomalyInjector:
"""Inject various anomaly types into sensor data"""
@staticmethod
def spike_anomaly(reading: SensorReading, magnitude: float = 3.0) -> SensorReading:
"""Create sudden spike in values"""
reading.temperature += magnitude * 10
reading.humidity += magnitude * 5
return reading
@staticmethod
def dropout_anomaly(reading: SensorReading) -> SensorReading:
"""Simulate sensor malfunction (zeros or nulls)"""
reading.temperature = 0.0
reading.humidity = 0.0
reading.pressure = 0.0
return reading
@staticmethod
def drift_anomaly(reading: SensorReading, drift_amount: float = 5.0) -> SensorReading:
"""Gradual sensor drift (calibration issue)"""
reading.temperature += drift_amount
return reading
@staticmethod
def noise_anomaly(reading: SensorReading, noise_factor: float = 5.0) -> SensorReading:
"""Excessive noise in readings"""
reading.temperature += random.uniform(-noise_factor, noise_factor)
reading.humidity += random.uniform(-noise_factor, noise_factor)
reading.pressure += random.uniform(-noise_factor * 2, noise_factor * 2)
return reading
@staticmethod
def should_inject_anomaly(probability: float = 0.05) -> bool:
"""Determine if anomaly should be injected (default 5% chance)"""
return random.random() < probability
@staticmethod
def inject_random_anomaly(reading: SensorReading, probability: float = 0.05) -> SensorReading:
"""Randomly inject one of several anomaly types"""
if not AnomalyInjector.should_inject_anomaly(probability):
return reading
anomaly_type = random.choice(['spike', 'dropout', 'noise', 'drift'])
if anomaly_type == 'spike':
return AnomalyInjector.spike_anomaly(reading)
elif anomaly_type == 'dropout':
return AnomalyInjector.dropout_anomaly(reading)
elif anomaly_type == 'noise':
return AnomalyInjector.noise_anomaly(reading)
elif anomaly_type == 'drift':
return AnomalyInjector.drift_anomaly(reading)
return reading
📊 Step 3: Production-Grade Generator¶
3.1 Complete Generator with All Features¶
Create comprehensive generator script:
# production_generator.py
import os
import time
import argparse
from simple_generator import SimpleDataGenerator
from advanced_generator import AdvancedDataGenerator
from anomaly_injector import AnomalyInjector
from sensor_schema import SensorReading
class ProductionDataGenerator(SimpleDataGenerator):
"""Production-grade data generator with all features"""
def __init__(self, connection_string: str, eventhub_name: str, use_patterns: bool = True, inject_anomalies: bool = True, anomaly_rate: float = 0.05):
super().__init__(connection_string, eventhub_name)
self.use_patterns = use_patterns
self.inject_anomalies = inject_anomalies
self.anomaly_rate = anomaly_rate
self.advanced_gen = AdvancedDataGenerator() if use_patterns else None
self.minutes_elapsed = 0
self.anomaly_count = 0
def generate_batch(self, device_ids: list, batch_size: int = 10) -> list:
"""Generate batch with patterns and anomalies"""
readings = []
for device_id in device_ids[:batch_size]:
# Generate reading with or without patterns
if self.use_patterns:
reading = self.advanced_gen.generate_realistic_reading(
device_id=device_id,
device_type='TempHumiditySensor',
minutes_elapsed=self.minutes_elapsed
)
else:
reading = SensorReading.generate_normal_reading(
device_id=device_id,
device_type='TempHumiditySensor'
)
# Inject anomalies if enabled
if self.inject_anomalies:
original_temp = reading.temperature
reading = AnomalyInjector.inject_random_anomaly(reading, self.anomaly_rate)
if abs(reading.temperature - original_temp) > 5:
self.anomaly_count += 1
print(f"⚠️ Anomaly injected for {device_id}")
readings.append(reading)
self.minutes_elapsed += 1
return readings
def main():
"""Main entry point with CLI arguments"""
parser = argparse.ArgumentParser(description='IoT Data Generator for Stream Analytics')
parser.add_argument('--devices', type=int, default=10, help='Number of IoT devices')
parser.add_argument('--rate', type=int, default=10, help='Events per second')
parser.add_argument('--duration', type=int, default=300, help='Duration in seconds')
parser.add_argument('--patterns', action='store_true', help='Use realistic patterns')
parser.add_argument('--anomalies', action='store_true', help='Inject anomalies')
parser.add_argument('--anomaly-rate', type=float, default=0.05, help='Anomaly probability (0.0-1.0)')
args = parser.parse_args()
# Load configuration
connection_string = os.environ.get("STREAM_EH_SEND_CONN")
eventhub_name = os.environ.get("STREAM_EH_NAME")
if not connection_string or not eventhub_name:
print("❌ Error: Environment variables not set")
exit(1)
# Create and run generator
generator = ProductionDataGenerator(
connection_string=connection_string,
eventhub_name=eventhub_name,
use_patterns=args.patterns,
inject_anomalies=args.anomalies,
anomaly_rate=args.anomaly_rate
)
generator.run(
device_count=args.devices,
events_per_second=args.rate,
duration_seconds=args.duration
)
# Print statistics
print(f"\n📈 Statistics:")
print(f" Anomalies injected: {generator.anomaly_count}")
print(f" Anomaly rate: {generator.anomaly_count / (args.rate * args.duration) * 100:.2f}%")
if __name__ == "__main__":
main()
3.2 Run Production Generator¶
Execute with various configurations:
# Basic generation (no patterns or anomalies)
python production_generator.py --devices 5 --rate 10 --duration 60
# With realistic patterns
python production_generator.py --devices 10 --rate 20 --duration 300 --patterns
# With anomaly injection (5% rate)
python production_generator.py --devices 10 --rate 20 --duration 300 --patterns --anomalies --anomaly-rate 0.05
# High-volume testing
python production_generator.py --devices 50 --rate 100 --duration 600 --patterns --anomalies
✅ Step 4: Verification and Monitoring¶
4.1 Monitor Event Hub Metrics¶
Check data ingestion in Azure Portal:
# View incoming messages over last 10 minutes
az monitor metrics list \
--resource "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/$env:STREAM_RG/providers/Microsoft.EventHub/namespaces/$env:STREAM_EH_NAMESPACE" \
--metric IncomingMessages \
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ss") \
--interval PT1M \
--output table
# Check for throttling or errors
az monitor metrics list \
--resource "/subscriptions/$(az account show --query id -o tsv)/resourceGroups/$env:STREAM_RG/providers/Microsoft.EventHub/namespaces/$env:STREAM_EH_NAMESPACE" \
--metric ThrottledRequests \
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ss") \
--interval PT1M \
--output table
4.2 Sample Data Validation¶
Create script to read and validate generated data:
# validate_data.py
import os
import json
from azure.eventhub import EventHubConsumerClient
def validate_event(event_data):
"""Validate event schema and values"""
try:
data = json.loads(event_data.body_as_str())
# Check required fields
required_fields = ['deviceId', 'deviceType', 'temperature', 'humidity', 'pressure', 'timestamp', 'location']
for field in required_fields:
if field not in data:
return False, f"Missing field: {field}"
# Validate ranges
if not (0 <= data['temperature'] <= 150):
return False, f"Temperature out of range: {data['temperature']}"
if not (0 <= data['humidity'] <= 100):
return False, f"Humidity out of range: {data['humidity']}"
if not (800 <= data['pressure'] <= 1200):
return False, f"Pressure out of range: {data['pressure']}"
return True, "Valid"
except Exception as e:
return False, str(e)
def on_event(partition_context, event):
valid, message = validate_event(event)
status = "✅" if valid else "❌"
print(f"{status} Device: {json.loads(event.body_as_str())['deviceId']} - {message}")
partition_context.update_checkpoint(event)
# Read last 10 events for validation
connection_string = os.environ.get("STREAM_EH_LISTEN_CONN")
eventhub_name = os.environ.get("STREAM_EH_NAME")
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=connection_string,
consumer_group="$Default",
eventhub_name=eventhub_name
)
print("📊 Validating recent events...\n")
with consumer_client:
consumer_client.receive(
on_event=on_event,
starting_position="-1" # Start from end
)
🎓 Key Concepts Learned¶
Data Generation Patterns¶
- Simple generation: Random values within ranges
- Realistic patterns: Daily cycles, trends, correlations
- Anomaly injection: Controlled anomalies for testing
- Velocity control: Configurable event rates
Event Hub Integration¶
- Producer patterns: Batch sending, error handling
- Connection management: Proper initialization and cleanup
- Partition strategies: How data is distributed
- Monitoring: Tracking ingestion metrics
Testing Scenarios¶
- Normal operations: Baseline data for query development
- High volume: Load testing and performance validation
- Anomaly detection: Testing alert and detection logic
- Edge cases: Null values, out-of-range data, malformed events
🚀 Next Steps¶
Your data generator is ready to produce realistic streaming data! Continue to:
Tutorial 03: Creating Stream Analytics Job →
In the next tutorial, you'll:
- Create your first Stream Analytics job
- Configure inputs from Event Hubs
- Set up outputs to various destinations
- Write your first streaming query
📚 Additional Resources¶
🔧 Troubleshooting¶
Issue: Connection Timeout¶
Symptoms: "EventHubError: Connection timeout"
Solution:
# Check Event Hub firewall rules
az eventhubs namespace network-rule list \
--namespace-name $env:STREAM_EH_NAMESPACE \
--resource-group $env:STREAM_RG
# Add your IP if needed
az eventhubs namespace network-rule add \
--namespace-name $env:STREAM_EH_NAMESPACE \
--resource-group $env:STREAM_RG \
--ip-address "YOUR_IP_ADDRESS"
Issue: Throttling Errors¶
Symptoms: "QuotaExceededException: Message size quota exceeded"
Solution:
# Reduce batch size or event rate
generator.run(device_count=5, events_per_second=5, duration_seconds=60)
# Or upgrade Event Hub tier
az eventhubs namespace update \
--name $env:STREAM_EH_NAMESPACE \
--resource-group $env:STREAM_RG \
--sku Standard \
--capacity 2
💬 Feedback¶
Was this tutorial helpful?
- ✅ Completed successfully - Continue to Tutorial 03
- ⚠️ Had issues - Report a problem
- 💡 Have suggestions - Share feedback
Tutorial Progress: 2 of 11 complete | Next: Stream Analytics Job Creation
Last Updated: January 2025