Skip to content

📊 Tutorial 4: Basic Query Development

Tutorial Duration Level

Master fundamental Stream Analytics Query Language (SAQL) operations including SELECT projections, WHERE filtering, aggregations, and GROUP BY operations for real-time data transformation.

🎯 Learning Objectives

After completing this tutorial, you will be able to:

  • Write SELECT statements to project and transform streaming data
  • Apply WHERE clauses for conditional filtering
  • Perform aggregations (AVG, MIN, MAX, COUNT, SUM)
  • Use GROUP BY for data summarization
  • Handle data types and perform conversions
  • Test queries with sample data before deploying

⏱️ Time Estimate: 30 minutes

  • SELECT & WHERE Queries: 10 minutes
  • Aggregation Queries: 10 minutes
  • Advanced Filtering: 10 minutes

📋 Prerequisites

  • Completed Tutorial 03: Job Creation
  • Stream Analytics job running
  • Data flowing from Event Hub to outputs
  • Basic SQL knowledge

📊 Step 1: SELECT Statement Fundamentals

1.1 Simple Projection

Select specific fields from incoming stream:

-- Select only essential fields
SELECT
    deviceId,
    timestamp,
    temperature,
    humidity
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

1.2 Column Aliasing

Rename columns for clarity:

-- Rename columns with meaningful aliases
SELECT
    deviceId AS SensorID,
    location AS DeviceLocation,
    timestamp AS ReadingTime,
    temperature AS TempFahrenheit,
    humidity AS HumidityPercent,
    System.Timestamp() AS ProcessedAt
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

1.3 Calculated Fields

Create derived columns with expressions:

-- Add calculated fields
SELECT
    deviceId,
    temperature,
    humidity,
    -- Convert Fahrenheit to Celsius
    ROUND((temperature - 32) * 5 / 9, 2) AS temperatureCelsius,
    -- Calculate heat index
    ROUND(temperature + (0.5 * humidity), 2) AS heatIndex,
    -- Categorize temperature
    CASE
        WHEN temperature < 60 THEN 'Cold'
        WHEN temperature BETWEEN 60 AND 75 THEN 'Comfortable'
        WHEN temperature > 75 THEN 'Hot'
        ELSE 'Unknown'
    END AS temperatureCategory
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

🔍 Step 2: WHERE Clause Filtering

2.1 Simple Filters

Filter events based on conditions:

-- Only process events with temperature above 75°F
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    humidity,
    'High Temperature Alert' AS alertType
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    temperature > 75;

2.2 Multiple Conditions

Combine multiple filter criteria:

-- Filter for critical conditions
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    humidity,
    vibration,
    status
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    (temperature > 80 OR humidity < 30)
    AND status != 'offline'
    AND vibration > 1.0;

2.3 String Filtering

Filter based on text patterns:

-- Filter devices by location
SELECT
    deviceId,
    location,
    temperature,
    humidity
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    location LIKE 'Building-A%'
    OR deviceId IN ('sensor-001', 'sensor-002', 'sensor-003');

2.4 NULL Handling

Manage missing or null values:

-- Handle NULL values appropriately
SELECT
    deviceId,
    COALESCE(location, 'Unknown Location') AS location,
    COALESCE(temperature, 0.0) AS temperature,
    COALESCE(humidity, 0.0) AS humidity,
    CASE
        WHEN temperature IS NULL THEN 'Missing Data'
        ELSE status
    END AS deviceStatus
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    temperature IS NOT NULL;  -- Exclude records with NULL temperature

📈 Step 3: Aggregation Functions

3.1 Basic Aggregations

Calculate summary statistics:

-- Calculate basic statistics across all devices
SELECT
    COUNT(*) AS totalEvents,
    AVG(temperature) AS avgTemperature,
    MIN(temperature) AS minTemperature,
    MAX(temperature) AS maxTemperature,
    AVG(humidity) AS avgHumidity,
    AVG(vibration) AS avgVibration,
    System.Timestamp() AS windowEnd
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    TumblingWindow(minute, 1);

3.2 Grouped Aggregations

Group data by dimensions:

-- Aggregations per device
SELECT
    deviceId,
    location,
    COUNT(*) AS eventCount,
    AVG(temperature) AS avgTemp,
    MIN(temperature) AS minTemp,
    MAX(temperature) AS maxTemp,
    STDEV(temperature) AS tempStdDev,
    System.Timestamp() AS windowEnd
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    deviceId,
    location,
    TumblingWindow(minute, 5);

3.3 Advanced Aggregations with HAVING

Filter aggregated results:

-- Find devices with high average temperature
SELECT
    deviceId,
    location,
    AVG(temperature) AS avgTemperature,
    COUNT(*) AS readingCount,
    System.Timestamp() AS windowEnd
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    deviceId,
    location,
    TumblingWindow(minute, 5)
HAVING
    AVG(temperature) > 75
    AND COUNT(*) > 10;  -- Minimum 10 readings for statistical significance

🔢 Step 4: Data Type Operations

4.1 Type Conversions

Convert between data types:

-- Demonstrate type conversions
SELECT
    deviceId,
    CAST(timestamp AS datetime) AS eventDateTime,
    CAST(temperature AS bigint) AS tempInteger,
    CAST(ROUND(humidity, 0) AS int) AS humidityInt,
    TRY_CAST(deviceId AS int) AS deviceIdNum,  -- Returns NULL if conversion fails
    CONCAT(deviceId, '-', location) AS fullIdentifier,
    SUBSTRING(location, 1, 10) AS buildingCode
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

4.2 String Operations

Manipulate text data:

-- String manipulation examples
SELECT
    deviceId,
    location,
    -- Extract building name
    SUBSTRING(location, 1, CHARINDEX('/', location) - 1) AS building,
    -- Convert to uppercase
    UPPER(status) AS statusUpper,
    -- String length
    LEN(location) AS locationLength,
    -- Replace text
    REPLACE(location, 'Building-', 'Bldg-') AS shortLocation,
    -- Concatenation
    CONCAT(deviceId, ' @ ', location) AS deviceInfo
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

4.3 Date/Time Operations

Work with timestamps:

-- Date and time operations
SELECT
    deviceId,
    timestamp,
    DATEPART(hour, timestamp) AS hourOfDay,
    DATEPART(day, timestamp) AS dayOfMonth,
    DATEPART(weekday, timestamp) AS dayOfWeek,
    DATEADD(hour, -5, timestamp) AS timestampEST,  -- Convert to EST
    DATEDIFF(second, timestamp, System.Timestamp()) AS processingDelaySeconds
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp;

🎯 Step 5: Complete Query Examples

5.1 Temperature Monitoring Dashboard

Create aggregated metrics for monitoring:

-- Real-time temperature monitoring dashboard
SELECT
    deviceId,
    location,
    -- Time window identifier
    System.Timestamp() AS windowEnd,
    -- Count metrics
    COUNT(*) AS totalReadings,
    -- Temperature metrics
    AVG(temperature) AS avgTemperature,
    MIN(temperature) AS minTemperature,
    MAX(temperature) AS maxTemperature,
    STDEV(temperature) AS tempStdDev,
    -- Humidity metrics
    AVG(humidity) AS avgHumidity,
    -- Status summary
    SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalCount,
    SUM(CASE WHEN status = 'warning' THEN 1 ELSE 0 END) AS warningCount,
    SUM(CASE WHEN status = 'critical' THEN 1 ELSE 0 END) AS criticalCount,
    -- Alert flag
    CASE
        WHEN AVG(temperature) > 80 THEN 1
        WHEN MIN(temperature) < 55 THEN 1
        ELSE 0
    END AS alertFlag
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    deviceId,
    location,
    TumblingWindow(minute, 5);

5.2 Anomaly Pre-Filter

Identify potential anomalies for deeper analysis:

-- Pre-filter potential anomalies
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    humidity,
    vibration,
    status,
    -- Calculate deviation from baseline
    ABS(temperature - 72.0) AS tempDeviation,
    -- Categorize severity
    CASE
        WHEN vibration > 2.0 THEN 'Critical'
        WHEN vibration > 1.0 THEN 'Warning'
        WHEN temperature > 85 OR temperature < 60 THEN 'Warning'
        ELSE 'Normal'
    END AS severityLevel,
    -- Add processing metadata
    System.Timestamp() AS detectedAt
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    -- Filter for anomalous conditions
    (temperature > 85 OR temperature < 60)
    OR (humidity > 80 OR humidity < 20)
    OR vibration > 1.0
    OR status IN ('warning', 'critical');

5.3 Device Health Summary

Track overall device health:

-- Device health summary per location
SELECT
    SUBSTRING(location, 1, CHARINDEX('/', location) - 1) AS building,
    COUNT(DISTINCT deviceId) AS deviceCount,
    AVG(temperature) AS avgTemperature,
    AVG(humidity) AS avgHumidity,
    AVG(vibration) AS avgVibration,
    -- Calculate health score (0-100)
    CASE
        WHEN AVG(vibration) < 0.5 AND AVG(temperature) BETWEEN 68 AND 75 THEN 100
        WHEN AVG(vibration) < 1.0 AND AVG(temperature) BETWEEN 65 AND 80 THEN 75
        WHEN AVG(vibration) < 1.5 THEN 50
        ELSE 25
    END AS healthScore,
    -- Status distribution
    SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalDevices,
    SUM(CASE WHEN status != 'normal' THEN 1 ELSE 0 END) AS issueDevices,
    System.Timestamp() AS windowEnd
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    SUBSTRING(location, 1, CHARINDEX('/', location) - 1),
    TumblingWindow(minute, 10);

🧪 Step 6: Testing Queries

6.1 Update Job with New Query

Apply one of the advanced queries:

# Save the temperature monitoring query
$query = @"
-- Temperature Monitoring Dashboard Query
SELECT
    deviceId,
    location,
    System.Timestamp() AS windowEnd,
    COUNT(*) AS totalReadings,
    AVG(temperature) AS avgTemperature,
    MIN(temperature) AS minTemperature,
    MAX(temperature) AS maxTemperature,
    AVG(humidity) AS avgHumidity,
    SUM(CASE WHEN status = 'normal' THEN 1 ELSE 0 END) AS normalCount,
    SUM(CASE WHEN status != 'normal' THEN 1 ELSE 0 END) AS alertCount
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    deviceId,
    location,
    TumblingWindow(minute, 5);
"@

$query | Out-File -FilePath "monitoring-query.sql" -Encoding UTF8

# Stop job
az stream-analytics job stop --name $env:STREAM_JOB --resource-group $env:STREAM_RG

# Update query
az stream-analytics transformation update `
    --job-name $env:STREAM_JOB `
    --resource-group $env:STREAM_RG `
    --name "Transformation" `
    --saql @monitoring-query.sql

# Restart job
az stream-analytics job start `
    --job-name $env:STREAM_JOB `
    --resource-group $env:STREAM_RG `
    --output-start-mode JobStartTime

6.2 Validate Query Results

Query SQL Database to verify aggregations:

$connectionString = "Server=tcp:$($env:STREAM_SQL_SERVER).database.windows.net,1433;Initial Catalog=$($env:STREAM_SQL_DB);User ID=$($env:STREAM_SQL_USER);Password=$($env:STREAM_SQL_PASSWORD);Encrypt=True;"

$validationQuery = @"
SELECT TOP 10
    deviceId,
    location,
    windowEnd,
    totalReadings,
    ROUND(avgTemperature, 2) AS avgTemp,
    normalCount,
    alertCount
FROM SensorReadings
ORDER BY windowEnd DESC;
"@

Invoke-Sqlcmd -ConnectionString $connectionString -Query $validationQuery | Format-Table

✅ Validation Exercises

Exercise 1: Filter High-Temperature Events

Write a query to select only events where temperature exceeds 80°F:

💡 Solution
SELECT
    deviceId,
    location,
    timestamp,
    temperature,
    'High Temp Alert' AS alertType
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    temperature > 80;

Exercise 2: Calculate Moving Average

Compute 5-minute moving average temperature per device:

💡 Solution
SELECT
    deviceId,
    AVG(temperature) AS avgTemperature,
    COUNT(*) AS sampleCount,
    System.Timestamp() AS windowEnd
INTO
    SqlOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
GROUP BY
    deviceId,
    TumblingWindow(minute, 5);

Exercise 3: Multi-Condition Filter

Select events from Building-A with temperature > 75 OR humidity < 30:

💡 Solution
SELECT
    deviceId,
    location,
    temperature,
    humidity
INTO
    BlobOutput
FROM
    EventHubInput TIMESTAMP BY timestamp
WHERE
    location LIKE 'Building-A%'
    AND (temperature > 75 OR humidity < 30);

🎓 Key Concepts Learned

SAQL Fundamentals

  • SELECT: Project and transform columns
  • WHERE: Filter events before processing
  • GROUP BY: Aggregate data by dimensions
  • HAVING: Filter aggregated results

Functions Mastered

  • Aggregates: AVG, MIN, MAX, COUNT, SUM, STDEV
  • String: SUBSTRING, CONCAT, UPPER, REPLACE, LEN
  • Date/Time: DATEPART, DATEADD, DATEDIFF
  • Conversion: CAST, TRY_CAST, COALESCE

Best Practices

  • Use TIMESTAMP BY to define event time
  • Filter early with WHERE to reduce processing
  • Use HAVING for post-aggregation filters
  • Handle NULL values explicitly
  • Add meaningful column aliases

🚀 Next Steps

You've mastered basic SAQL queries! Continue to:

Tutorial 05: Windowing Functions →

In the next tutorial, you'll learn:

  • Tumbling windows for fixed-interval aggregations
  • Hopping windows for overlapping analysis
  • Sliding windows for continuous metrics
  • Session windows for activity-based grouping

📚 Additional Resources


Tutorial Progress: 4 of 11 complete | Next: Windowing Functions

Last Updated: January 2025