Retail Analytics Reference Architecture¶
Overview¶
This reference architecture demonstrates a comprehensive retail analytics solution built on Azure Cloud Scale Analytics (CSA) services. It addresses common retail challenges including inventory optimization, sales forecasting, customer analytics, and omnichannel integration while providing real-time insights and advanced analytics capabilities.
Business Drivers¶
- Inventory Optimization: Reduce stockouts and overstock situations through predictive analytics
- Customer 360: Unified view of customer interactions across all channels
- Sales Forecasting: Accurate demand prediction for better planning
- Personalization: Real-time product recommendations and targeted marketing
- Operational Excellence: Store performance analytics and workforce optimization
- Supply Chain Visibility: End-to-end tracking and optimization
Key Capabilities¶
- Real-time point-of-sale (POS) data ingestion
- Customer behavior analytics across online and offline channels
- Inventory level monitoring and forecasting
- Price optimization and markdown analysis
- Market basket analysis and product affinity
- Store performance benchmarking
- Supply chain analytics and supplier performance
Architecture Diagram¶
graph TB
subgraph "Data Sources"
POS[POS Systems]
ECOM[E-Commerce Platform]
MOBILE[Mobile App]
LOYALTY[Loyalty System]
ERP[ERP System]
INVENTORY[Inventory Management]
SUPPLIERS[Supplier Systems]
SOCIAL[Social Media]
end
subgraph "Ingestion Layer"
EH[Azure Event Hubs]
ADF[Azure Data Factory]
APIM[API Management]
end
subgraph "Storage Layer"
BRONZE[(Bronze Layer<br/>Raw Data)]
SILVER[(Silver Layer<br/>Cleansed Data)]
GOLD[(Gold Layer<br/>Business Aggregates)]
end
subgraph "Processing Layer"
SYNAPSE[Azure Synapse Analytics]
SPARK[Spark Pools]
SQL[Serverless SQL]
DATABRICKS[Azure Databricks]
end
subgraph "Analytics & ML"
AML[Azure ML]
COGNITIVE[Cognitive Services]
SYNML[Synapse ML]
end
subgraph "Serving Layer"
COSMOS[Cosmos DB<br/>Operational Store]
CACHE[Redis Cache]
SEARCH[Cognitive Search]
end
subgraph "Presentation Layer"
PBI[Power BI]
WEBAPP[Web Dashboard]
API[REST APIs]
ALERTS[Alert System]
end
subgraph "Governance & Security"
PURVIEW[Microsoft Purview]
KV[Key Vault]
AAD[Azure AD]
MONITOR[Azure Monitor]
end
%% Data Flow
POS --> EH
ECOM --> EH
MOBILE --> EH
LOYALTY --> ADF
ERP --> ADF
INVENTORY --> ADF
SUPPLIERS --> ADF
SOCIAL --> APIM
EH --> BRONZE
ADF --> BRONZE
APIM --> BRONZE
BRONZE --> SPARK
SPARK --> SILVER
SILVER --> DATABRICKS
DATABRICKS --> GOLD
GOLD --> SQL
SQL --> PBI
GOLD --> AML
AML --> COSMOS
SYNML --> COSMOS
DATABRICKS --> SEARCH
COSMOS --> API
CACHE --> API
SEARCH --> WEBAPP
GOLD --> ALERTS
PURVIEW -.-> BRONZE
PURVIEW -.-> SILVER
PURVIEW -.-> GOLD
KV -.-> SYNAPSE
AAD -.-> PBI
MONITOR -.-> SYNAPSE
style BRONZE fill:#cd7f32
style SILVER fill:#c0c0c0
style GOLD fill:#ffd700 Azure Service Mapping¶
| Component | Azure Service | Purpose | Key Features |
|---|---|---|---|
| Stream Ingestion | Azure Event Hubs | Real-time POS and clickstream data | Partition-based scaling, 7-day retention |
| Batch Ingestion | Azure Data Factory | ERP, CRM, inventory batch loads | 90+ connectors, mapping data flows |
| API Gateway | Azure API Management | Third-party integrations | Rate limiting, caching, OAuth |
| Data Lake | Azure Data Lake Storage Gen2 | Medallion architecture storage | Hierarchical namespace, ACLs |
| Big Data Processing | Azure Synapse Spark Pools | Large-scale transformations | Auto-scaling, Delta Lake support |
| Advanced Analytics | Azure Databricks | ML model training, complex analytics | MLflow, collaborative notebooks |
| SQL Analytics | Synapse Serverless SQL | Ad-hoc querying, reporting | Pay-per-query, T-SQL interface |
| Machine Learning | Azure Machine Learning | Demand forecasting, churn prediction | AutoML, MLOps, managed endpoints |
| Cognitive AI | Azure Cognitive Services | Sentiment analysis, image recognition | Pre-trained models, custom vision |
| Operational Store | Azure Cosmos DB | Product recommendations, session state | Global distribution, sub-10ms latency |
| Caching | Azure Cache for Redis | API response caching | Geo-replication, persistence |
| Search | Azure Cognitive Search | Product catalog search | AI enrichment, faceted navigation |
| Visualization | Power BI | Executive dashboards, self-service | Embedded analytics, mobile apps |
| Data Governance | Microsoft Purview | Data catalog, lineage tracking | Auto-discovery, sensitivity labels |
| Monitoring | Azure Monitor | Platform health, performance | Log Analytics, Application Insights |
| Security | Azure Key Vault | Secrets, certificates, keys | RBAC, audit logging |
| Identity | Azure Active Directory | Authentication, authorization | B2C integration, MFA |
Data Flow Architecture¶
Real-Time Path (Hot Path)¶
sequenceDiagram
participant POS as POS Terminal
participant EH as Event Hubs
participant ASA as Stream Analytics
participant COSMOS as Cosmos DB
participant CACHE as Redis Cache
participant API as API Layer
participant APP as Web/Mobile App
POS->>EH: Transaction Event
EH->>ASA: Stream Processing
ASA->>COSMOS: Write Aggregates
ASA->>CACHE: Update Cache
APP->>API: Get Real-time Metrics
API->>CACHE: Check Cache
CACHE-->>API: Return Data
API-->>APP: Display Metrics Components: 1. Event Capture: POS transactions streamed to Event Hubs 2. Stream Processing: Azure Stream Analytics for windowing aggregations 3. Operational Store: Cosmos DB for low-latency reads 4. Caching: Redis for frequently accessed data 5. API Layer: Low-latency API responses (<100ms)
Batch Path (Cold Path)¶
flowchart LR
subgraph Sources
S1[ERP System]
S2[Inventory DB]
S3[CRM]
end
subgraph Orchestration
ADF[Data Factory<br/>Pipelines]
end
subgraph Storage
BRONZE[(Bronze<br/>Raw)]
SILVER[(Silver<br/>Cleansed)]
GOLD[(Gold<br/>Aggregated)]
end
subgraph Processing
SPARK[Spark<br/>Transformations]
SQL[SQL<br/>Views]
end
subgraph Analytics
PBI[Power BI<br/>Reports]
ML[ML Models]
end
S1 --> ADF
S2 --> ADF
S3 --> ADF
ADF --> BRONZE
BRONZE --> SPARK
SPARK --> SILVER
SILVER --> SPARK
SPARK --> GOLD
GOLD --> SQL
GOLD --> ML
SQL --> PBI Components: 1. Extraction: Daily/hourly batch extracts from source systems 2. Bronze Layer: Raw data in original format (Parquet/CSV) 3. Silver Layer: Validated, deduplicated, conformed data 4. Gold Layer: Business-level aggregates and dimensions 5. Serving: SQL views for reporting, ML features for models
Data Model¶
Conceptual Model¶
erDiagram
CUSTOMER ||--o{ TRANSACTION : makes
CUSTOMER ||--o{ LOYALTY_EVENT : earns
CUSTOMER {
string customer_id PK
string email
string phone
date registration_date
string segment
decimal lifetime_value
}
TRANSACTION ||--|{ TRANSACTION_LINE : contains
TRANSACTION {
string transaction_id PK
string customer_id FK
datetime transaction_date
string store_id FK
string channel
decimal total_amount
decimal discount_amount
string payment_method
}
TRANSACTION_LINE }o--|| PRODUCT : includes
TRANSACTION_LINE {
string line_id PK
string transaction_id FK
string product_id FK
int quantity
decimal unit_price
decimal line_total
}
PRODUCT }o--|| CATEGORY : belongs_to
PRODUCT {
string product_id PK
string sku
string name
string category_id FK
decimal cost
decimal price
string supplier_id FK
}
CATEGORY {
string category_id PK
string name
string parent_category_id
int hierarchy_level
}
STORE ||--o{ TRANSACTION : processes
STORE ||--o{ INVENTORY : stocks
STORE {
string store_id PK
string name
string region
string format
decimal square_footage
}
INVENTORY }o--|| PRODUCT : tracks
INVENTORY {
string inventory_id PK
string store_id FK
string product_id FK
int quantity_on_hand
int reorder_point
datetime last_updated
}
LOYALTY_EVENT {
string event_id PK
string customer_id FK
datetime event_date
string event_type
int points_earned
int points_redeemed
} Storage Structure¶
```text/retail-analytics/ ├── bronze/ │ ├── pos/ │ │ └── year=2024/month=01/day=15/.parquet │ ├── ecommerce/ │ │ └── year=2024/month=01/day=15/.parquet │ ├── inventory/ │ │ └── snapshot_date=2024-01-15/.parquet │ └── crm/ │ └── extract_date=2024-01-15/.parquet ├── silver/ │ ├── transactions/ │ │ └── year=2024/month=01/day=15/.delta │ ├── customers/ │ │ └── *.delta (SCD Type 2) │ ├── products/ │ │ └── *.delta (SCD Type 2) │ └── inventory_snapshots/ │ └── snapshot_date=2024-01-15/.delta └── gold/ ├── sales_daily/ │ └── year=2024/month=01/.delta ├── customer_metrics/ │ └── *.delta ├── product_performance/ │ └── year=2024/month=01/.delta └── inventory_positions/ └── snapshot_date=2024-01-15/*.delta
---
## Key Use Cases
### 1. Real-Time Inventory Monitoring
**Scenario**: Track inventory levels across all stores in real-time to prevent stockouts
**Implementation**:
```python
# Spark Structured Streaming for inventory updates
from pyspark.sql.functions import *
from delta.tables import *
# Read POS transaction stream
pos_stream = (spark
.readStream
.format("eventhubs")
.options(**eventhubs_config)
.load()
)
# Parse and extract inventory impact
inventory_deltas = (pos_stream
.select(
col("body").cast("string").alias("json_data")
)
.select(from_json(col("json_data"), transaction_schema).alias("data"))
.select("data.*")
.select(
col("store_id"),
col("product_id"),
col("quantity").cast("int").alias("quantity_sold"),
current_timestamp().alias("processed_time")
)
.groupBy(
window(col("processed_time"), "5 minutes"),
col("store_id"),
col("product_id")
)
.agg(sum("quantity_sold").alias("total_sold"))
)
# Update Delta table with inventory changes
delta_table = DeltaTable.forPath(spark, "/gold/inventory_positions")
(inventory_deltas
.writeStream
.foreachBatch(lambda batch_df, batch_id:
delta_table.alias("target").merge(
batch_df.alias("updates"),
"target.store_id = updates.store_id AND target.product_id = updates.product_id"
)
.whenMatchedUpdate(set = {
"quantity_on_hand": "target.quantity_on_hand - updates.total_sold",
"last_updated": "updates.window.end"
})
.execute()
)
.start()
)
2. Customer 360 View¶
Scenario: Unified customer profile combining online and offline interactions
Implementation:
-- Synapse Serverless SQL View
CREATE VIEW gold.customer_360 AS
SELECT
c.customer_id,
c.email,
c.registration_date,
c.segment,
-- Transaction metrics
COUNT(DISTINCT t.transaction_id) as total_transactions,
SUM(t.total_amount) as lifetime_value,
MAX(t.transaction_date) as last_purchase_date,
DATEDIFF(day, MAX(t.transaction_date), GETDATE()) as days_since_purchase,
-- Channel preferences
SUM(CASE WHEN t.channel = 'Online' THEN 1 ELSE 0 END) as online_transactions,
SUM(CASE WHEN t.channel = 'Store' THEN 1 ELSE 0 END) as store_transactions,
SUM(CASE WHEN t.channel = 'Mobile' THEN 1 ELSE 0 END) as mobile_transactions,
-- Product preferences
STRING_AGG(DISTINCT p.category_id, ',')
WITHIN GROUP (ORDER BY p.category_id) as preferred_categories,
-- Loyalty metrics
SUM(l.points_earned) as total_points_earned,
SUM(l.points_redeemed) as total_points_redeemed,
-- Behavioral segments
CASE
WHEN DATEDIFF(day, MAX(t.transaction_date), GETDATE()) <= 30
AND COUNT(DISTINCT t.transaction_id) >= 4 THEN 'Champion'
WHEN DATEDIFF(day, MAX(t.transaction_date), GETDATE()) <= 90
AND SUM(t.total_amount) > 1000 THEN 'Loyal'
WHEN DATEDIFF(day, MAX(t.transaction_date), GETDATE()) > 180 THEN 'At Risk'
ELSE 'Potential'
END as rfm_segment
FROM silver.customers c
LEFT JOIN silver.transactions t ON c.customer_id = t.customer_id
LEFT JOIN silver.transaction_lines tl ON t.transaction_id = tl.transaction_id
LEFT JOIN silver.products p ON tl.product_id = p.product_id
LEFT JOIN silver.loyalty_events l ON c.customer_id = l.customer_id
GROUP BY
c.customer_id,
c.email,
c.registration_date,
c.segment
3. Demand Forecasting¶
Scenario: Predict product demand for next 30 days at store level
Implementation:
# Azure ML training pipeline
from azureml.core import Workspace, Dataset, Experiment
from azureml.train.automl import AutoMLConfig
from azureml.core.compute import ComputeTarget
# Connect to workspace
ws = Workspace.from_config()
# Load training data from Synapse
dataset = Dataset.Tabular.from_sql_query(
query="""
SELECT
product_id,
store_id,
sale_date,
quantity_sold,
price,
promotion_flag,
day_of_week,
is_holiday,
temperature,
precipitation
FROM gold.sales_daily
WHERE sale_date >= DATEADD(year, -2, GETDATE())
""",
query_timeout=600,
workspace=ws
)
# Configure AutoML
automl_config = AutoMLConfig(
task='forecasting',
primary_metric='normalized_root_mean_squared_error',
experiment_timeout_minutes=60,
training_data=dataset,
label_column_name='quantity_sold',
n_cross_validations=5,
enable_early_stopping=True,
forecasting_parameters={
'time_column_name': 'sale_date',
'forecast_horizon': 30,
'target_lags': [1, 7, 14, 28],
'target_rolling_window_size': 7,
'freq': 'D',
'group_columns': ['product_id', 'store_id']
},
compute_target='ml-compute-cluster'
)
# Run experiment
experiment = Experiment(ws, 'demand-forecasting')
run = experiment.submit(automl_config, show_output=True)
best_run, fitted_model = run.get_output()
# Deploy model
from azureml.core.model import Model, InferenceConfig
from azureml.core.webservice import AciWebservice
model = best_run.register_model(
model_name='demand-forecast-model',
model_path='outputs/model.pkl'
)
inference_config = InferenceConfig(
entry_script='score.py',
environment=best_run.get_environment()
)
deployment_config = AciWebservice.deploy_configuration(
cpu_cores=2,
memory_gb=4,
auth_enabled=True,
enable_app_insights=True
)
service = Model.deploy(
workspace=ws,
name='demand-forecast-service',
models=[model],
inference_config=inference_config,
deployment_config=deployment_config
)
4. Market Basket Analysis¶
Scenario: Identify product affinity for cross-selling and store layout optimization
Implementation:
# Databricks notebook for association rules
from pyspark.sql.functions import *
from pyspark.ml.fpm import FPGrowth
# Prepare transaction baskets
baskets = (spark.table("silver.transaction_lines")
.join(spark.table("silver.products"), "product_id")
.groupBy("transaction_id")
.agg(collect_set("product_id").alias("items"))
.where(size("items") > 1) # Multi-item baskets only
)
# FP-Growth algorithm
fpGrowth = FPGrowth(
itemsCol="items",
minSupport=0.01, # 1% of transactions
minConfidence=0.3 # 30% confidence
)
model = fpGrowth.fit(baskets)
# Frequent itemsets
frequent_itemsets = model.freqItemsets
frequent_itemsets.write.mode("overwrite").saveAsTable("gold.frequent_itemsets")
# Association rules
rules = model.associationRules
rules.write.mode("overwrite").saveAsTable("gold.association_rules")
# Example: Products to recommend with milk
milk_recommendations = (rules
.where(array_contains("antecedent", "PROD_MILK_001"))
.orderBy(desc("lift"))
.select(
"consequent",
"confidence",
"lift",
"support"
)
.limit(10)
)
milk_recommendations.show()
5. Price Optimization¶
Scenario: Dynamic pricing based on demand elasticity and competitor pricing
Implementation:
# Price elasticity calculation
price_elasticity = spark.sql("""
WITH price_changes AS (
SELECT
product_id,
sale_date,
price,
quantity_sold,
LAG(price) OVER (PARTITION BY product_id ORDER BY sale_date) as prev_price,
LAG(quantity_sold) OVER (PARTITION BY product_id ORDER BY sale_date) as prev_quantity
FROM gold.sales_daily
WHERE promotion_flag = false
)
SELECT
product_id,
AVG(
((quantity_sold - prev_quantity) / prev_quantity) /
((price - prev_price) / prev_price)
) as price_elasticity,
STDDEV(
((quantity_sold - prev_quantity) / prev_quantity) /
((price - prev_price) / prev_price)
) as elasticity_stddev,
COUNT(*) as price_change_events
FROM price_changes
WHERE prev_price IS NOT NULL
AND prev_quantity > 0
AND ABS((price - prev_price) / prev_price) > 0.01 -- >1% price change
GROUP BY product_id
HAVING COUNT(*) >= 10 -- Sufficient data points
""")
# Optimal price recommendation
optimal_pricing = spark.sql("""
WITH current_metrics AS (
SELECT
p.product_id,
p.current_price,
p.cost,
e.price_elasticity,
AVG(s.quantity_sold) as avg_daily_quantity,
MAX(c.competitor_price) as competitor_price
FROM silver.products p
JOIN price_elasticity e ON p.product_id = e.product_id
JOIN gold.sales_daily s ON p.product_id = s.product_id
LEFT JOIN external.competitor_prices c ON p.sku = c.sku
WHERE s.sale_date >= DATEADD(day, -30, GETDATE())
GROUP BY p.product_id, p.current_price, p.cost, e.price_elasticity
)
SELECT
product_id,
current_price,
competitor_price,
CASE
WHEN price_elasticity < -1.5 THEN current_price * 0.95 -- Elastic, reduce
WHEN price_elasticity > -0.5 THEN current_price * 1.05 -- Inelastic, increase
ELSE current_price -- Keep current
END as recommended_price,
(recommended_price - cost) * avg_daily_quantity * 365 as projected_annual_margin
FROM current_metrics
WHERE recommended_price > cost * 1.1 -- Maintain minimum margin
""")
Performance Optimization¶
Synapse Spark Configuration¶
# Optimized Spark configuration for retail workloads
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Delta Lake optimizations
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# Partition pruning
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
# Z-ordering for transaction table
spark.sql("""
OPTIMIZE silver.transactions
ZORDER BY (transaction_date, store_id, customer_id)
""")
# Vacuum old versions
spark.sql("VACUUM silver.transactions RETAIN 168 HOURS") # 7 days
Serverless SQL Optimizations¶
-- Partitioning strategy
CREATE EXTERNAL TABLE gold.sales_daily
WITH (
LOCATION = '/gold/sales_daily/',
DATA_SOURCE = DataLakeStorage,
FILE_FORMAT = ParquetFormat
)
AS
SELECT *
FROM OPENROWSET(
BULK '/gold/sales_daily/year=*/month=*/*.parquet',
FORMAT = 'PARQUET'
) AS rows
WHERE year >= 2023;
-- Materialized views for common queries
CREATE MATERIALIZED VIEW gold.sales_summary AS
SELECT
sale_date,
store_id,
category_id,
SUM(total_amount) as total_sales,
SUM(quantity_sold) as total_units,
COUNT(DISTINCT transaction_id) as transaction_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM gold.sales_daily
GROUP BY sale_date, store_id, category_id;
-- Statistics for query optimization
CREATE STATISTICS sales_date_stats ON gold.sales_daily(sale_date);
CREATE STATISTICS store_stats ON gold.sales_daily(store_id);
CREATE STATISTICS customer_stats ON gold.sales_daily(customer_id);
Cosmos DB Indexing¶
{
"indexingMode": "consistent",
"automatic": true,
"includedPaths": [
{
"path": "/customer_id/?"
},
{
"path": "/product_id/?"
},
{
"path": "/recommendation_score/?"
}
],
"excludedPaths": [
{
"path": "/metadata/*"
},
{
"path": "/_etag/?"
}
],
"compositeIndexes": [
[
{
"path": "/customer_id",
"order": "ascending"
},
{
"path": "/recommendation_score",
"order": "descending"
}
]
]
}
Security & Compliance¶
Data Classification¶
# Microsoft Purview sensitivity labels
from azure.purview.catalog import PurviewCatalogClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
client = PurviewCatalogClient(
endpoint="https://<purview-account>.purview.azure.com",
credential=credential
)
# Apply labels to datasets
sensitivity_labels = {
"silver.customers": "Confidential - PII",
"silver.transactions": "Internal - Financial",
"silver.products": "General",
"gold.customer_360": "Highly Confidential - Customer Data"
}
for table, label in sensitivity_labels.items():
client.entity.create_or_update(
entity={
"typeName": "azure_datalake_gen2_path",
"attributes": {
"qualifiedName": f"abfss://data@storage.dfs.core.windows.net/{table}",
"name": table,
"classifications": [{
"typeName": label.replace(" - ", "_")
}]
}
}
)
PCI-DSS Compliance for Payment Data¶
Requirements: - Encrypt payment data at rest and in transit - Tokenize credit card numbers - Implement access controls and audit logging - Maintain secure network architecture
Implementation:
# Payment data tokenization
from azure.keyvault.secrets import SecretClient
from hashlib import sha256
import uuid
def tokenize_card_number(card_number, key_vault_client):
"""Replace card number with token, store mapping in Key Vault"""
# Generate token
token = str(uuid.uuid4())
# Hash for verification
card_hash = sha256(card_number.encode()).hexdigest()
# Store in Key Vault (encrypted at rest)
key_vault_client.set_secret(
name=f"card-token-{token}",
value=card_number,
tags={"type": "payment_token", "hash": card_hash}
)
# Return token and last 4 digits for display
return {
"token": token,
"last_four": card_number[-4:],
"card_type": detect_card_type(card_number)
}
# Store only tokens in data lake
transaction_data = {
"transaction_id": "TXN_12345",
"customer_id": "CUST_67890",
"payment_token": "<TOKENIZED-PAYMENT-VALUE>",
"last_four": "4532",
"amount": 149.99
}
RBAC Configuration¶
# Azure role assignments
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.identity import DefaultAzureCredential
credential = DefaultAzureCredential()
auth_client = AuthorizationManagementClient(credential, subscription_id)
# Data Engineer role
role_assignments = {
"data-engineers": {
"role": "Storage Blob Data Contributor",
"scope": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{storage}",
"paths": ["/bronze", "/silver"]
},
"data-analysts": {
"role": "Storage Blob Data Reader",
"scope": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{storage}",
"paths": ["/gold"]
},
"ml-engineers": {
"role": "Storage Blob Data Contributor",
"scope": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{storage}",
"paths": ["/gold", "/ml-features"]
}
}
Audit Logging¶
-- Synapse audit queries
SELECT
event_time,
user_name,
client_ip,
resource_type,
operation_name,
result_signature,
duration_ms
FROM sys.fn_get_audit_file(
'https://<storage>.blob.core.windows.net/audit-logs/*/audit/*.xel',
DEFAULT,
DEFAULT
)
WHERE event_time >= DATEADD(day, -7, GETDATE())
AND operation_name IN ('SELECT', 'UPDATE', 'DELETE')
AND resource_type = 'DATABASE'
ORDER BY event_time DESC;
Cost Optimization¶
Resource Sizing Recommendations¶
| Workload Type | Service Tier | Configuration | Estimated Monthly Cost |
|---|---|---|---|
| Event Hubs | Standard | 10 TUs, 7-day retention | $2,500 |
| Synapse Spark | Memory Optimized | 3 nodes (16 cores), auto-pause | $3,200 |
| Databricks | Standard | 5-node cluster, 70% spot instances | $4,800 |
| Cosmos DB | Standard | 10K RU/s, 500GB storage | $5,840 |
| Data Lake Gen2 | Hot/Cool tiers | 50TB hot, 200TB cool | $2,100 |
| Azure ML | Standard | 10 compute hours/day | $1,500 |
| Power BI Premium | P1 | 8 v-cores | $4,995 |
| Total | ~$25,000/month |
Cost Reduction Strategies¶
# 1. Auto-pause Spark pools when idle
spark_config = {
"auto_pause": {
"enabled": True,
"delay_in_minutes": 15
},
"auto_scale": {
"enabled": True,
"min_nodes": 3,
"max_nodes": 10
}
}
# 2. Use spot instances for non-critical workloads
databricks_cluster = {
"node_type_id": "Standard_D16s_v3",
"num_workers": 5,
"azure_attributes": {
"availability": "SPOT_WITH_FALLBACK_AZURE",
"spot_bid_max_price": -1 # Pay up to on-demand price
}
}
# 3. Partition pruning to reduce data scanning
sales_query = """
SELECT *
FROM gold.sales_daily
WHERE year = 2024
AND month = 1
AND store_id IN ('STORE_001', 'STORE_002')
"""
# Scans only 2 stores in January 2024, not entire dataset
# 4. Serverless SQL for infrequent queries
# Pay only for data processed, not compute time
# 5. Data lifecycle management
lifecycle_policy = {
"rules": [
{
"name": "move-to-cool",
"type": "Lifecycle",
"definition": {
"filters": {
"blobTypes": ["blockBlob"],
"prefixMatch": ["bronze/"]
},
"actions": {
"baseBlob": {
"tierToCool": {"daysAfterModificationGreaterThan": 30},
"tierToArchive": {"daysAfterModificationGreaterThan": 180}
}
}
}
}
]
}
Monitoring & Alerting¶
Key Metrics¶
# Azure Monitor metrics for retail analytics
from azure.monitor.query import MetricsQueryClient, MetricAggregationType
metrics_to_track = {
"event_hubs": [
"IncomingMessages",
"OutgoingMessages",
"ThrottledRequests",
"ActiveConnections"
],
"synapse_spark": [
"SparkJobsRunning",
"SparkJobsSucceeded",
"SparkJobsFailed",
"SparkPoolCPUUtilization"
],
"cosmos_db": [
"TotalRequests",
"TotalRequestUnits",
"ServerSideLatency",
"AvailabilityPercentage"
],
"data_factory": [
"PipelineSucceededRuns",
"PipelineFailedRuns",
"ActivitySucceededRuns",
"TriggerSucceededRuns"
]
}
# Alert rules
alert_rules = {
"high_latency": {
"metric": "ServerSideLatency",
"threshold": 100, # ms
"operator": "GreaterThan",
"severity": "Warning"
},
"low_availability": {
"metric": "AvailabilityPercentage",
"threshold": 99.9,
"operator": "LessThan",
"severity": "Critical"
},
"pipeline_failures": {
"metric": "PipelineFailedRuns",
"threshold": 3,
"operator": "GreaterThan",
"window": "PT15M",
"severity": "Error"
}
}
Custom Application Insights¶
from applicationinsights import TelemetryClient
from applicationinsights.exceptions import enable
# Initialize client
tc = TelemetryClient(instrumentation_key='<key>')
enable(instrumentation_key='<key>')
# Track custom events
tc.track_event('InventoryAlert', {
'product_id': 'PROD_12345',
'store_id': 'STORE_001',
'current_quantity': 5,
'reorder_point': 10,
'alert_type': 'Low Stock'
})
# Track custom metrics
tc.track_metric('DailyRevenue', 45623.50, properties={
'store_id': 'STORE_001',
'date': '2024-01-15'
})
# Track dependencies
tc.track_dependency('CosmosDB', 'GetRecommendations',
duration=45, success=True)
tc.flush()
Deployment¶
Infrastructure as Code (Bicep)¶
// retail-analytics-infrastructure.bicep
param location string = resourceGroup().location
param environment string = 'prod'
// Data Lake Storage
resource dataLake 'Microsoft.Storage/storageAccounts@2021-09-01' = {
name: 'retaildatalake${environment}'
location: location
kind: 'StorageV2'
sku: {
name: 'Standard_LRS'
}
properties: {
isHnsEnabled: true
minimumTlsVersion: 'TLS1_2'
supportsHttpsTrafficOnly: true
}
}
// Synapse Workspace
resource synapseWorkspace 'Microsoft.Synapse/workspaces@2021-06-01' = {
name: 'retail-synapse-${environment}'
location: location
identity: {
type: 'SystemAssigned'
}
properties: {
defaultDataLakeStorage: {
accountUrl: dataLake.properties.primaryEndpoints.dfs
filesystem: 'synapse'
}
managedVirtualNetwork: 'default'
managedVirtualNetworkSettings: {
preventDataExfiltration: true
}
}
}
// Event Hubs Namespace
resource eventHubNamespace 'Microsoft.EventHub/namespaces@2021-11-01' = {
name: 'retail-events-${environment}'
location: location
sku: {
name: 'Standard'
tier: 'Standard'
capacity: 10
}
properties: {
isAutoInflateEnabled: true
maximumThroughputUnits: 20
}
}
// Cosmos DB Account
resource cosmosDb 'Microsoft.DocumentDB/databaseAccounts@2022-05-15' = {
name: 'retail-cosmos-${environment}'
location: location
kind: 'GlobalDocumentDB'
properties: {
databaseAccountOfferType: 'Standard'
consistencyPolicy: {
defaultConsistencyLevel: 'Session'
}
locations: [
{
locationName: location
failoverPriority: 0
}
]
}
}
Deployment Script¶
#!/bin/bash
# deploy-retail-analytics.sh
RESOURCE_GROUP="retail-analytics-rg"
LOCATION="eastus"
ENVIRONMENT="prod"
# Create resource group
az group create \
--name $RESOURCE_GROUP \
--location $LOCATION
# Deploy infrastructure
az deployment group create \
--resource-group $RESOURCE_GROUP \
--template-file retail-analytics-infrastructure.bicep \
--parameters environment=$ENVIRONMENT
# Deploy Synapse artifacts
az synapse workspace deploy \
--workspace-name retail-synapse-$ENVIRONMENT \
--artifacts-folder ./synapse-artifacts \
--resource-group $RESOURCE_GROUP
# Deploy Data Factory pipelines
az datafactory pipeline create \
--resource-group $RESOURCE_GROUP \
--factory-name retail-adf-$ENVIRONMENT \
--name daily-batch-load \
--pipeline @pipelines/daily-batch-load.json
echo "Deployment complete!"
Best Practices¶
Data Quality¶
- Schema Validation: Enforce schemas at ingestion using Delta Lake
- Deduplication: Implement deduplication logic for transaction data
- Referential Integrity: Validate foreign keys between tables
- Data Profiling: Regular profiling to detect anomalies
- Automated Testing: Unit tests for transformation logic
Performance¶
- Partitioning: Partition by date and store_id
- Z-Ordering: Optimize for common query patterns
- Caching: Cache frequently accessed data in Redis
- Indexing: Proper indexes on Cosmos DB collections
- Batch Sizing: Optimize batch sizes for throughput
Operations¶
- CI/CD: Automated deployment pipelines
- Monitoring: Comprehensive monitoring and alerting
- Backup: Regular backups of critical data
- Disaster Recovery: Multi-region failover capability
- Documentation: Maintain up-to-date documentation
Related Resources¶
Internal Documentation¶
External References¶
Next Steps¶
- Review the IoT Analytics Reference Architecture
- Explore Financial Services Architecture
- Learn about ML Pipeline Architecture
- Implement Cost Optimization Strategies