📊 Tutorial 4: Basic Query Development¶
Master fundamental Stream Analytics Query Language (SAQL) operations including SELECT projections, WHERE filtering, aggregations, and GROUP BY operations for real-time data transformation.
🎯 Learning Objectives¶
After completing this tutorial, you will be able to:
- ✅ Write SELECT statements to project and transform streaming data
- ✅ Apply WHERE clauses for conditional filtering
- ✅ Perform aggregations (AVG, MIN, MAX, COUNT, SUM)
- ✅ Use GROUP BY for data summarization
- ✅ Handle data types and perform conversions
- ✅ Test queries with sample data before deploying
⏱️ Time Estimate: 30 minutes¶
- SELECT & WHERE Queries: 10 minutes
- Aggregation Queries: 10 minutes
- Advanced Filtering: 10 minutes
📋 Prerequisites¶
- Completed Tutorial 03: Job Creation
- Stream Analytics job running
- Data flowing from Event Hub to outputs
- Basic SQL knowledge
📊 Step 1: SELECT Statement Fundamentals¶
1.1 Simple Projection¶
Select specific fields from incoming stream:
-- Select only essential fields
SELECT
deviceId,
timestamp,
temperature,
humidity
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
1.2 Column Aliasing¶
Rename columns for clarity:
-- Rename columns with meaningful aliases
SELECT
deviceId AS SensorID,
location AS DeviceLocation,
timestamp AS ReadingTime,
temperature AS TempFahrenheit,
humidity AS HumidityPercent,
System.Timestamp() AS ProcessedAt
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
1.3 Calculated Fields¶
Create derived columns with expressions:
-- Add calculated fields
SELECT
deviceId,
temperature,
humidity,
-- Convert Fahrenheit to Celsius
ROUND((temperature - 32) * 5 / 9, 2) AS temperatureCelsius,
-- Calculate heat index
ROUND(temperature + (0.5 * humidity), 2) AS heatIndex,
-- Categorize temperature
CASE
WHEN temperature < 60 THEN 'Cold'
WHEN temperature BETWEEN 60 AND 75 THEN 'Comfortable'
WHEN temperature > 75 THEN 'Hot'
ELSE 'Unknown'
END AS temperatureCategory
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
🔍 Step 2: WHERE Clause Filtering¶
2.1 Simple Filters¶
Filter events based on conditions:
-- Only process events with temperature above 75°F
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
'High Temperature Alert' AS alertType
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
WHERE
temperature > 75;
2.2 Multiple Conditions¶
Combine multiple filter criteria:
-- Filter for critical conditions
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
vibration,
status
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp
WHERE
(temperature > 80 OR humidity < 30)
AND status != 'offline'
AND vibration > 1.0;
2.3 String Filtering¶
Filter based on text patterns:
-- Filter devices by location
SELECT
deviceId,
location,
temperature,
humidity
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
WHERE
location LIKE 'Building-A%'
OR deviceId IN ('sensor-001', 'sensor-002', 'sensor-003');
2.4 NULL Handling¶
Manage missing or null values:
-- Handle NULL values appropriately
SELECT
deviceId,
COALESCE(location, 'Unknown Location') AS location,
COALESCE(temperature, 0.0) AS temperature,
COALESCE(humidity, 0.0) AS humidity,
CASE
WHEN temperature IS NULL THEN 'Missing Data'
ELSE status
END AS deviceStatus
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
WHERE
temperature IS NOT NULL; -- Exclude records with NULL temperature
📈 Step 3: Aggregation Functions¶
3.1 Basic Aggregations¶
Calculate summary statistics:
-- Calculate basic statistics across all devices
SELECT
COUNT(*) AS totalEvents,
AVG(temperature) AS avgTemperature,
MIN(temperature) AS minTemperature,
MAX(temperature) AS maxTemperature,
AVG(humidity) AS avgHumidity,
AVG(vibration) AS avgVibration,
System.Timestamp() AS windowEnd
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
TumblingWindow(minute, 1);
3.2 Grouped Aggregations¶
Group data by dimensions:
-- Aggregations per device
SELECT
deviceId,
location,
COUNT(*) AS eventCount,
AVG(temperature) AS avgTemp,
MIN(temperature) AS minTemp,
MAX(temperature) AS maxTemp,
STDEV(temperature) AS tempStdDev,
System.Timestamp() AS windowEnd
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
deviceId,
location,
TumblingWindow(minute, 5);
3.3 Advanced Aggregations with HAVING¶
Filter aggregated results:
-- Find devices with high average temperature
SELECT
deviceId,
location,
AVG(temperature) AS avgTemperature,
COUNT(*) AS readingCount,
System.Timestamp() AS windowEnd
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
deviceId,
location,
TumblingWindow(minute, 5)
HAVING
AVG(temperature) > 75
AND COUNT(*) > 10; -- Minimum 10 readings for statistical significance
🔢 Step 4: Data Type Operations¶
4.1 Type Conversions¶
Convert between data types:
-- Demonstrate type conversions
SELECT
deviceId,
CAST(timestamp AS datetime) AS eventDateTime,
CAST(temperature AS bigint) AS tempInteger,
CAST(ROUND(humidity, 0) AS int) AS humidityInt,
TRY_CAST(deviceId AS int) AS deviceIdNum, -- Returns NULL if conversion fails
CONCAT(deviceId, '-', location) AS fullIdentifier,
SUBSTRING(location, 1, 10) AS buildingCode
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
4.2 String Operations¶
Manipulate text data:
-- String manipulation examples
SELECT
deviceId,
location,
-- Extract building name
SUBSTRING(location, 1, CHARINDEX('/', location) - 1) AS building,
-- Convert to uppercase
UPPER(status) AS statusUpper,
-- String length
LEN(location) AS locationLength,
-- Replace text
REPLACE(location, 'Building-', 'Bldg-') AS shortLocation,
-- Concatenation
CONCAT(deviceId, ' @ ', location) AS deviceInfo
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
4.3 Date/Time Operations¶
Work with timestamps:
-- Date and time operations
SELECT
deviceId,
timestamp,
DATEPART(hour, timestamp) AS hourOfDay,
DATEPART(day, timestamp) AS dayOfMonth,
DATEPART(weekday, timestamp) AS dayOfWeek,
DATEADD(hour, -5, timestamp) AS timestampEST, -- Convert to EST
DATEDIFF(second, timestamp, System.Timestamp()) AS processingDelaySeconds
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
🎯 Step 5: Complete Query Examples¶
5.1 Temperature Monitoring Dashboard¶
Create aggregated metrics for monitoring:
-- Real-time temperature monitoring dashboard
SELECT
deviceId,
location,
-- Time window identifier
System.Timestamp() AS windowEnd,
-- Count metrics
COUNT(*) AS totalReadings,
-- Temperature metrics
AVG(temperature) AS avgTemperature,
MIN(temperature) AS minTemperature,
MAX(temperature) AS maxTemperature,
STDEV(temperature) AS tempStdDev,
-- Humidity metrics
AVG(humidity) AS avgHumidity,
-- Status summary
SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalCount,
SUM(CASE WHEN status = 'warning' THEN 1 ELSE 0 END) AS warningCount,
SUM(CASE WHEN status = 'critical' THEN 1 ELSE 0 END) AS criticalCount,
-- Alert flag
CASE
WHEN AVG(temperature) > 80 THEN 1
WHEN MIN(temperature) < 55 THEN 1
ELSE 0
END AS alertFlag
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
deviceId,
location,
TumblingWindow(minute, 5);
5.2 Anomaly Pre-Filter¶
Identify potential anomalies for deeper analysis:
-- Pre-filter potential anomalies
SELECT
deviceId,
location,
timestamp,
temperature,
humidity,
vibration,
status,
-- Calculate deviation from baseline
ABS(temperature - 72.0) AS tempDeviation,
-- Categorize severity
CASE
WHEN vibration > 2.0 THEN 'Critical'
WHEN vibration > 1.0 THEN 'Warning'
WHEN temperature > 85 OR temperature < 60 THEN 'Warning'
ELSE 'Normal'
END AS severityLevel,
-- Add processing metadata
System.Timestamp() AS detectedAt
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp
WHERE
-- Filter for anomalous conditions
(temperature > 85 OR temperature < 60)
OR (humidity > 80 OR humidity < 20)
OR vibration > 1.0
OR status IN ('warning', 'critical');
5.3 Device Health Summary¶
Track overall device health:
-- Device health summary per location
SELECT
SUBSTRING(location, 1, CHARINDEX('/', location) - 1) AS building,
COUNT(DISTINCT deviceId) AS deviceCount,
AVG(temperature) AS avgTemperature,
AVG(humidity) AS avgHumidity,
AVG(vibration) AS avgVibration,
-- Calculate health score (0-100)
CASE
WHEN AVG(vibration) < 0.5 AND AVG(temperature) BETWEEN 68 AND 75 THEN 100
WHEN AVG(vibration) < 1.0 AND AVG(temperature) BETWEEN 65 AND 80 THEN 75
WHEN AVG(vibration) < 1.5 THEN 50
ELSE 25
END AS healthScore,
-- Status distribution
SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalDevices,
SUM(CASE WHEN status != 'normal' THEN 1 ELSE 0 END) AS issueDevices,
System.Timestamp() AS windowEnd
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
SUBSTRING(location, 1, CHARINDEX('/', location) - 1),
TumblingWindow(minute, 10);
🧪 Step 6: Testing Queries¶
6.1 Update Job with New Query¶
Apply one of the advanced queries:
# Save the temperature monitoring query
$query = @"
-- Temperature Monitoring Dashboard Query
SELECT
deviceId,
location,
System.Timestamp() AS windowEnd,
COUNT(*) AS totalReadings,
AVG(temperature) AS avgTemperature,
MIN(temperature) AS minTemperature,
MAX(temperature) AS maxTemperature,
AVG(humidity) AS avgHumidity,
SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalCount,
SUM(CASE WHEN status != 'normal' THEN 1 ELSE 0 END) AS alertCount
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp
GROUP BY
deviceId,
location,
TumblingWindow(minute, 5);
"@
$query | Out-File -FilePath "monitoring-query.sql" -Encoding UTF8
# Stop job
az stream-analytics job stop --name $env:STREAM_JOB --resource-group $env:STREAM_RG
# Update query
az stream-analytics transformation update `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "Transformation" `
--saql @monitoring-query.sql
# Restart job
az stream-analytics job start `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--output-start-mode JobStartTime
6.2 Validate Query Results¶
Query SQL Database to verify aggregations:
$connectionString = "Server=tcp:$($env:STREAM_SQL_SERVER).database.windows.net,1433;Initial Catalog=$($env:STREAM_SQL_DB);User ID=$($env:STREAM_SQL_USER);Password=$($env:STREAM_SQL_PASSWORD);Encrypt=True;"
$validationQuery = @"
SELECT TOP 10
deviceId,
location,
windowEnd,
totalReadings,
ROUND(avgTemperature, 2) AS avgTemp,
normalCount,
alertCount
FROM SensorReadings
ORDER BY windowEnd DESC;
"@
Invoke-Sqlcmd -ConnectionString $connectionString -Query $validationQuery | Format-Table
✅ Validation Exercises¶
Exercise 1: Filter High-Temperature Events¶
Write a query to select only events where temperature exceeds 80°F:
💡 Solution
Exercise 2: Calculate Moving Average¶
Compute 5-minute moving average temperature per device:
💡 Solution
Exercise 3: Multi-Condition Filter¶
Select events from Building-A with temperature > 75 OR humidity < 30:
💡 Solution
🎓 Key Concepts Learned¶
SAQL Fundamentals¶
- SELECT: Project and transform columns
- WHERE: Filter events before processing
- GROUP BY: Aggregate data by dimensions
- HAVING: Filter aggregated results
Functions Mastered¶
- Aggregates: AVG, MIN, MAX, COUNT, SUM, STDEV
- String: SUBSTRING, CONCAT, UPPER, REPLACE, LEN
- Date/Time: DATEPART, DATEADD, DATEDIFF
- Conversion: CAST, TRY_CAST, COALESCE
Best Practices¶
- Use TIMESTAMP BY to define event time
- Filter early with WHERE to reduce processing
- Use HAVING for post-aggregation filters
- Handle NULL values explicitly
- Add meaningful column aliases
🚀 Next Steps¶
You've mastered basic SAQL queries! Continue to:
Tutorial 05: Windowing Functions →
In the next tutorial, you'll learn:
- Tumbling windows for fixed-interval aggregations
- Hopping windows for overlapping analysis
- Sliding windows for continuous metrics
- Session windows for activity-based grouping
📚 Additional Resources¶
Tutorial Progress: 4 of 11 complete | Next: Windowing Functions
Last Updated: January 2025