🔧 Tutorial 3: Stream Analytics Job Creation¶
Create your first Azure Stream Analytics job with proper input/output configuration. Learn the fundamentals of Stream Analytics Query Language (SAQL) and test queries with live data.
🎯 Learning Objectives¶
After completing this tutorial, you will be able to:
- ✅ Create Stream Analytics job with optimal settings
- ✅ Configure Event Hub input with timestamp and partition settings
- ✅ Set up multiple outputs including Blob Storage and SQL Database
- ✅ Write basic SAQL queries for data transformation
- ✅ Test queries with sample data and live streams
- ✅ Monitor job metrics and troubleshoot issues
⏱️ Time Estimate: 35 minutes¶
- Job Creation & Configuration: 15 minutes
- Input/Output Setup: 10 minutes
- Query Development: 10 minutes
📋 Prerequisites¶
- Completed Tutorial 01: Environment Setup
- Completed Tutorial 02: Data Generator Setup
- Data generators actively sending events to Event Hub
- Storage Account created
🛠️ Step 1: Create Stream Analytics Job¶
1.1 Create Job via Azure CLI¶
Create a Stream Analytics job in your resource group:
# Create Stream Analytics job
az stream-analytics job create `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--location $env:STREAM_LOCATION `
--output-error-policy "Drop" `
--events-outoforder-policy "Adjust" `
--events-outoforder-max-delay 10 `
--events-late-arrival-max-delay 5 `
--data-locale "en-US" `
--compatibility-level "1.2" `
--tags "Environment=Tutorial" "Purpose=Learning"
# Verify job creation
az stream-analytics job show `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--query "{Name:name, State:jobState, CreatedDate:createdDate}" `
--output table
Expected Output:
Name State CreatedDate
------------------------ ------- -----------------------
streamtutorial-asa-1234 Created 2025-01-15T10:30:00Z
1.2 Configure Streaming Units¶
Set appropriate compute capacity for the job:
# Configure with 1 Streaming Unit (SU) for tutorial
az stream-analytics job update `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--streaming-units 1
# Verify streaming units
az stream-analytics job show `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--query "transformation.streamingUnits" `
--output tsv
💡 Streaming Units: Start with 1 SU for development. Scale up to 3, 6, or more SUs for production based on throughput requirements.
📥 Step 2: Configure Event Hub Input¶
2.1 Create Input Configuration File¶
Create JSON configuration for Event Hub input:
# Create input configuration
$inputConfig = @{
properties = @{
type = "Stream"
datasource = @{
type = "Microsoft.ServiceBus/EventHub"
properties = @{
serviceBusNamespace = $env:STREAM_EH_NAMESPACE
eventHubName = $env:STREAM_EH_NAME
consumerGroupName = "`$Default"
authenticationMode = "ConnectionString"
sharedAccessPolicyName = "ListenPolicy"
sharedAccessPolicyKey = (az eventhubs eventhub authorization-rule keys list `
--name ListenPolicy `
--eventhub-name $env:STREAM_EH_NAME `
--namespace-name $env:STREAM_EH_NAMESPACE `
--resource-group $env:STREAM_RG `
--query primaryKey `
--output tsv)
}
}
serialization = @{
type = "Json"
properties = @{
encoding = "UTF8"
}
}
}
} | ConvertTo-Json -Depth 10
# Save to file
$inputConfig | Out-File -FilePath "eventhub-input.json" -Encoding UTF8
# Create input
az stream-analytics input create `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "EventHubInput" `
--properties @eventhub-input.json
2.2 Configure Timestamp Settings¶
Define how to extract event time from your data:
# Update input with timestamp configuration
$timestampConfig = @{
properties = @{
type = "Stream"
datasource = @{
type = "Microsoft.ServiceBus/EventHub"
properties = @{
serviceBusNamespace = $env:STREAM_EH_NAMESPACE
eventHubName = $env:STREAM_EH_NAME
consumerGroupName = "`$Default"
authenticationMode = "ConnectionString"
sharedAccessPolicyName = "ListenPolicy"
sharedAccessPolicyKey = (az eventhubs eventhub authorization-rule keys list `
--name ListenPolicy `
--eventhub-name $env:STREAM_EH_NAME `
--namespace-name $env:STREAM_EH_NAMESPACE `
--resource-group $env:STREAM_RG `
--query primaryKey `
--output tsv)
}
}
serialization = @{
type = "Json"
properties = @{
encoding = "UTF8"
}
}
}
} | ConvertTo-Json -Depth 10 | Out-File -FilePath "eventhub-input-timestamp.json" -Encoding UTF8
az stream-analytics input update `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "EventHubInput" `
--properties @eventhub-input-timestamp.json
2.3 Test Input Connection¶
Verify Event Hub input is configured correctly:
# Test input connectivity
az stream-analytics input test `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--input-name "EventHubInput"
# Check input status
az stream-analytics input show `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--input-name "EventHubInput" `
--query "{Name:name, Type:properties.type, Source:properties.datasource.type}" `
--output table
📤 Step 3: Configure Outputs¶
3.1 Create Blob Storage Output¶
Configure output for raw data archival:
# Create Blob Storage output configuration
$blobOutputConfig = @{
properties = @{
datasource = @{
type = "Microsoft.Storage/Blob"
properties = @{
storageAccounts = @(
@{
accountName = $env:STREAM_SA
accountKey = $env:STREAM_SA_KEY
}
)
container = "rawdata"
pathPattern = "{date}/{time}"
dateFormat = "yyyy-MM-dd"
timeFormat = "HH"
}
}
serialization = @{
type = "Json"
properties = @{
encoding = "UTF8"
format = "LineSeparated"
}
}
}
} | ConvertTo-Json -Depth 10 | Out-File -FilePath "blob-output.json" -Encoding UTF8
# Create Blob output
az stream-analytics output create `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "BlobOutput" `
--properties @blob-output.json
3.2 Create Azure SQL Database Output¶
First, create a SQL Database for structured outputs:
# Create SQL Server
$sqlServerName = "streamtutorial-sql-$(Get-Random -Minimum 1000 -Maximum 9999)"
$sqlAdminUser = "sqladmin"
$sqlAdminPassword = "P@ssw0rd$(Get-Random -Minimum 1000 -Maximum 9999)!"
az sql server create `
--name $sqlServerName `
--resource-group $env:STREAM_RG `
--location $env:STREAM_LOCATION `
--admin-user $sqlAdminUser `
--admin-password $sqlAdminPassword
# Configure firewall to allow Azure services
az sql server firewall-rule create `
--server $sqlServerName `
--resource-group $env:STREAM_RG `
--name "AllowAzureServices" `
--start-ip-address 0.0.0.0 `
--end-ip-address 0.0.0.0
# Create database
$sqlDatabaseName = "StreamAnalyticsDB"
az sql db create `
--server $sqlServerName `
--resource-group $env:STREAM_RG `
--name $sqlDatabaseName `
--service-objective Basic `
--max-size 2GB
# Save SQL credentials
[Environment]::SetEnvironmentVariable("STREAM_SQL_SERVER", $sqlServerName, "User")
[Environment]::SetEnvironmentVariable("STREAM_SQL_DB", $sqlDatabaseName, "User")
[Environment]::SetEnvironmentVariable("STREAM_SQL_USER", $sqlAdminUser, "User")
[Environment]::SetEnvironmentVariable("STREAM_SQL_PASSWORD", $sqlAdminPassword, "User")
Write-Host "SQL Database created: $sqlServerName/$sqlDatabaseName"
Create the output table:
# Install SqlServer module if not present
if (!(Get-Module -ListAvailable -Name SqlServer)) {
Install-Module -Name SqlServer -Force -AllowClobber
}
# Create table for sensor readings
$createTableQuery = @"
CREATE TABLE SensorReadings (
DeviceId NVARCHAR(50) NOT NULL,
Location NVARCHAR(200) NOT NULL,
EventTimestamp DATETIME2 NOT NULL,
Temperature FLOAT NOT NULL,
Humidity FLOAT NOT NULL,
Pressure FLOAT NOT NULL,
Vibration FLOAT NOT NULL,
Status NVARCHAR(20) NOT NULL,
EventType NVARCHAR(20) NOT NULL,
PRIMARY KEY (DeviceId, EventTimestamp)
);
CREATE NONCLUSTERED INDEX IX_SensorReadings_Timestamp
ON SensorReadings(EventTimestamp DESC);
CREATE NONCLUSTERED INDEX IX_SensorReadings_Status
ON SensorReadings(Status);
"@
$connectionString = "Server=tcp:$sqlServerName.database.windows.net,1433;Initial Catalog=$sqlDatabaseName;Persist Security Info=False;User ID=$sqlAdminUser;Password=$sqlAdminPassword;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"
Invoke-Sqlcmd -ConnectionString $connectionString -Query $createTableQuery
Write-Host "✅ SQL table created successfully"
Configure SQL Database output:
# Create SQL output configuration
$sqlOutputConfig = @{
properties = @{
datasource = @{
type = "Microsoft.Sql/Server/Database"
properties = @{
server = "$sqlServerName.database.windows.net"
database = $sqlDatabaseName
user = $sqlAdminUser
password = $sqlAdminPassword
table = "SensorReadings"
}
}
}
} | ConvertTo-Json -Depth 10 | Out-File -FilePath "sql-output.json" -Encoding UTF8
# Create SQL output
az stream-analytics output create `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "SqlOutput" `
--properties @sql-output.json
3.3 Verify Output Configuration¶
List all configured outputs:
# List all outputs
az stream-analytics output list `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--query "[].{Name:name, Type:properties.datasource.type}" `
--output table
Expected Output:
Name Type
---------- ---------------------------------
BlobOutput Microsoft.Storage/Blob
SqlOutput Microsoft.Sql/Server/Database
📝 Step 4: Write Stream Analytics Query¶
4.1 Create Pass-Through Query¶
Start with a simple query that passes all data through:
-- Basic pass-through query
-- Reads from Event Hub and writes to all outputs
-- Output 1: Send all raw data to Blob Storage
SELECT
deviceId,
location,
timestamp AS eventTimestamp,
temperature,
humidity,
pressure,
vibration,
status,
eventType,
System.Timestamp() AS processingTime
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
-- Output 2: Send structured data to SQL Database
SELECT
deviceId,
location,
CAST(timestamp AS datetime) AS eventTimestamp,
temperature,
humidity,
pressure,
vibration,
status,
eventType
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
4.2 Update Job with Query¶
Apply the query to your Stream Analytics job:
# Create query file
$query = @"
-- Stream Analytics Query
-- Tutorial 03: Basic pass-through query with multiple outputs
-- Archive all raw data to Blob Storage
SELECT
deviceId,
location,
timestamp AS eventTimestamp,
temperature,
humidity,
pressure,
vibration,
status,
eventType,
System.Timestamp() AS processingTime
INTO
BlobOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
-- Send structured data to SQL Database
SELECT
deviceId,
location,
CAST(timestamp AS datetime) AS eventTimestamp,
temperature,
humidity,
pressure,
vibration,
status,
eventType
INTO
SqlOutput
FROM
EventHubInput TIMESTAMP BY timestamp;
"@
$query | Out-File -FilePath "stream-query.sql" -Encoding UTF8
# Update transformation with query
$transformationConfig = @{
properties = @{
streamingUnits = 1
query = $query
}
} | ConvertTo-Json -Depth 10 | Out-File -FilePath "transformation.json" -Encoding UTF8
az stream-analytics transformation create `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--name "Transformation" `
--streaming-units 1 `
--saql @stream-query.sql
▶️ Step 5: Start and Test the Job¶
5.1 Start Stream Analytics Job¶
Begin processing events:
# Start job with JobStartTime to process from now
az stream-analytics job start `
--job-name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--output-start-mode JobStartTime
# Monitor job startup
Write-Host "Starting job... This may take 1-2 minutes."
Start-Sleep -Seconds 60
# Check job state
az stream-analytics job show `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--query "{Name:name, State:jobState, LastOutputTime:lastOutputEventTime}" `
--output table
5.2 Verify Data Flow¶
Ensure data generator is running and check outputs:
# Start data generator in background
Start-Process python -ArgumentList "sensor_data_generator.py" -NoNewWindow
# Wait for data to flow through pipeline
Write-Host "Waiting for data to flow through pipeline..."
Start-Sleep -Seconds 120
# Check SQL Database for records
$verifyQuery = "SELECT COUNT(*) AS RecordCount, MIN(EventTimestamp) AS FirstEvent, MAX(EventTimestamp) AS LastEvent FROM SensorReadings;"
Invoke-Sqlcmd -ConnectionString $connectionString -Query $verifyQuery | Format-Table
Expected Output:
RecordCount FirstEvent LastEvent
----------- ---------------------- ----------------------
240 2025-01-15 10:45:00.000 2025-01-15 10:47:00.000
5.3 Verify Blob Storage Output¶
Check that files are being written to Blob Storage:
# List blobs in rawdata container
az storage blob list `
--account-name $env:STREAM_SA `
--account-key $env:STREAM_SA_KEY `
--container-name "rawdata" `
--query "[].{Name:name, Size:properties.contentLength, LastModified:properties.lastModified}" `
--output table
📊 Step 6: Monitor Job Performance¶
6.1 View Job Metrics¶
Monitor key performance indicators:
# Get job resource ID
$jobResourceId = az stream-analytics job show `
--name $env:STREAM_JOB `
--resource-group $env:STREAM_RG `
--query id `
--output tsv
# View input events metric
az monitor metrics list `
--resource $jobResourceId `
--metric "InputEvents" `
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--end-time (Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--interval PT1M `
--aggregation Total `
--output table
# View output events metric
az monitor metrics list `
--resource $jobResourceId `
--metric "OutputEvents" `
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--end-time (Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--interval PT1M `
--aggregation Total `
--output table
6.2 Check for Errors¶
Monitor for any runtime errors:
# View runtime errors
az monitor metrics list `
--resource $jobResourceId `
--metric "Errors" `
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--end-time (Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--interval PT1M `
--aggregation Total `
--output table
# Check watermark delay (event processing latency)
az monitor metrics list `
--resource $jobResourceId `
--metric "AMLCalloutInputEvents" `
--start-time (Get-Date).AddMinutes(-10).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--end-time (Get-Date).ToString("yyyy-MM-ddTHH:mm:ssZ") `
--interval PT1M `
--output table
✅ Validation¶
Validation Checklist¶
- Stream Analytics job created and running
- Event Hub input connected and receiving data
- Blob Storage output writing files
- SQL Database receiving records
- No errors in job metrics
- Processing latency under 10 seconds
Query Sample Data from SQL¶
# Query latest sensor readings
$sampleQuery = @"
SELECT TOP 10
DeviceId,
Location,
EventTimestamp,
Temperature,
Humidity,
Status
FROM SensorReadings
ORDER BY EventTimestamp DESC;
"@
Invoke-Sqlcmd -ConnectionString $connectionString -Query $sampleQuery | Format-Table
🎓 Key Concepts Learned¶
Stream Analytics Architecture¶
- Inputs: Data sources (Event Hubs, IoT Hub, Blob Storage)
- Query: Transformation logic using SAQL
- Outputs: Destinations (Storage, SQL, Power BI, Event Hubs)
- Transformation: Compute resources (Streaming Units)
SAQL Fundamentals¶
- TIMESTAMP BY: Defines event time field
- System.Timestamp(): Processing time when event reached output
- INTO: Directs query results to specific output
- Multiple Outputs: Single query can write to multiple destinations
Job Configuration¶
- Streaming Units: Compute capacity allocation
- Out-of-Order Policy: How to handle late events
- Error Policy: Drop or retry on output errors
- Compatibility Level: Query language feature set
🚀 Next Steps¶
You've created a working Stream Analytics job! Continue to:
In the next tutorial, you'll learn:
- Advanced SELECT statements with filtering
- WHERE clauses for conditional processing
- Aggregations and GROUP BY operations
- Data type conversions and calculations
📚 Additional Resources¶
- Stream Analytics Query Language Reference
- Configure Inputs for Stream Analytics
- Configure Outputs for Stream Analytics
🔧 Troubleshooting¶
Issue: Job Fails to Start¶
Symptoms: Job state shows "Failed" after start attempt
Solution:
# Check for configuration errors
az stream-analytics job show --name $env:STREAM_JOB --resource-group $env:STREAM_RG --query "properties"
# Review job diagnostic logs
az monitor diagnostic-settings list --resource $jobResourceId
Issue: No Data in Outputs¶
Symptoms: Job running but no data in Blob or SQL
Solution:
# Verify input is receiving events
az stream-analytics input test --job-name $env:STREAM_JOB --resource-group $env:STREAM_RG --input-name "EventHubInput"
# Check if data generator is running
# Restart generator if needed
python sensor_data_generator.py
Tutorial Progress: 3 of 11 complete | Next: Basic Queries
Last Updated: January 2025