Skip to content

Financial Services Reference Architecture

Overview

This reference architecture demonstrates an enterprise-grade financial services analytics platform built on Azure Cloud Scale Analytics (CSA), designed to meet stringent regulatory compliance requirements while delivering real-time fraud detection, risk analytics, and regulatory reporting capabilities.

Business Drivers

  • Regulatory Compliance: Meet SOX, Basel III, GDPR, and PCI-DSS requirements
  • Fraud Detection: Real-time transaction monitoring and anomaly detection
  • Risk Management: Credit risk, market risk, and operational risk analytics
  • Customer Analytics: 360-degree customer view with privacy controls
  • Trading Analytics: High-frequency market data processing
  • Regulatory Reporting: Automated compliance reporting (CCAR, Dodd-Frank)
  • Anti-Money Laundering (AML): Transaction pattern analysis and suspicious activity detection

Key Capabilities

  • Sub-second fraud detection on transaction streams
  • Immutable audit trail with cryptographic verification
  • Fine-grained access controls with attribute-based policies
  • Real-time risk calculations and stress testing
  • Automated regulatory report generation
  • Customer PII protection with data masking
  • Multi-region disaster recovery with RPO < 1 minute

Architecture Diagram

graph TB
    subgraph "Data Sources"
        CORE[Core Banking System]
        CARDS[Card Processing]
        ATM[ATM Network]
        MOBILE[Mobile Banking]
        TRADING[Trading Platform]
        MARKET[Market Data Feeds]
        CRM[CRM System]
        EXTERNAL[Credit Bureaus]
    end

    subgraph "Ingestion Layer - Secure Zone"
        EH[Event Hubs<br/>Premium]
        ADF[Data Factory<br/>Managed VNet]
        APIM[API Management<br/>Internal Mode]
        HSM[Key Vault<br/>HSM-backed]
    end

    subgraph "Storage Layer - Encrypted"
        BRONZE[(Bronze Layer<br/>Immutable Storage)]
        SILVER[(Silver Layer<br/>Encrypted + Audit)]
        GOLD[(Gold Layer<br/>Masked Data)]
        AUDIT[(Audit Layer<br/>Blockchain-verified)]
    end

    subgraph "Processing Layer - Isolated"
        SYNAPSE[Synapse Analytics<br/>Managed VNet]
        SPARK[Spark Pools<br/>PE-enabled]
        SQL[Serverless SQL<br/>AAD Auth]
        DATABRICKS[Databricks<br/>Secure Cluster]
    end

    subgraph "ML & Analytics"
        AML_MODEL[Azure ML<br/>Fraud Detection]
        COGNITIVE[Cognitive Services<br/>KYC Verification]
        ANOMALY[Anomaly Detector<br/>Risk Monitoring]
    end

    subgraph "Serving Layer - DMZ"
        COSMOS[Cosmos DB<br/>Multi-region]
        SQL_DB[SQL Database<br/>Encrypted]
        CACHE[Redis Cache<br/>SSL-only]
    end

    subgraph "Presentation Layer"
        PBI[Power BI Premium<br/>RLS enabled]
        WEBAPP[Web Portal<br/>B2C Auth]
        API[REST API<br/>OAuth 2.0]
        REPORTS[Regulatory Reports<br/>SFTP Delivery]
    end

    subgraph "Security & Governance"
        PURVIEW[Microsoft Purview<br/>Data Lineage]
        SENTINEL[Azure Sentinel<br/>SIEM]
        DLP[DLP Policies<br/>Sensitive Data]
        KV[Key Vault<br/>Encryption Keys]
        AAD[Azure AD<br/>PIM + MFA]
    end

    %% Data Flow with Security
    CORE --> |TLS 1.3| EH
    CARDS --> |TLS 1.3| EH
    ATM --> |VPN| EH
    MOBILE --> |mTLS| APIM
    TRADING --> |Dedicated Circuit| ADF
    MARKET --> |Direct Feed| EH
    CRM --> |PE| ADF
    EXTERNAL --> |API Gateway| APIM

    EH --> |PE| BRONZE
    ADF --> |PE| BRONZE
    APIM --> |PE| BRONZE

    BRONZE --> SPARK
    SPARK --> |Encryption| SILVER
    SILVER --> DATABRICKS
    DATABRICKS --> |Masking| GOLD

    GOLD --> SQL
    SQL --> PBI

    GOLD --> AML_MODEL
    AML_MODEL --> COSMOS

    DATABRICKS --> SQL_DB
    SQL_DB --> API

    COSMOS --> API
    CACHE --> API
    API --> WEBAPP

    %% Audit Trail
    BRONZE -.->|Immutable| AUDIT
    SILVER -.->|Immutable| AUDIT
    GOLD -.->|Immutable| AUDIT

    %% Governance
    PURVIEW -.->|Scan| BRONZE
    PURVIEW -.->|Scan| SILVER
    PURVIEW -.->|Scan| GOLD
    SENTINEL -.->|Monitor| EH
    SENTINEL -.->|Monitor| SYNAPSE
    DLP -.->|Enforce| GOLD
    KV -.->|Rotate Keys| BRONZE
    AAD -.->|AuthZ| PBI

    style BRONZE fill:#cd7f32
    style SILVER fill:#c0c0c0
    style GOLD fill:#ffd700
    style AUDIT fill:#ff6b6b

Azure Service Mapping

Component Azure Service Purpose Compliance Feature
Stream Ingestion Event Hubs Premium Transaction streams with geo-DR Availability zones, auto-failover
Batch Ingestion Azure Data Factory Batch loads via managed VNet Private endpoints, CMK encryption
API Gateway API Management Internal API with OAuth JWT validation, rate limiting, IP filtering
Data Lake ADLS Gen2 Immutable storage with versioning Legal hold, WORM, audit logs
Big Data Processing Synapse Spark Pools PII detection and masking Managed VNet, customer-managed keys
Advanced Analytics Azure Databricks ML model training with isolation Customer-managed VNet, SCIM provisioning
SQL Analytics Synapse Serverless SQL Ad-hoc analysis with AAD auth Row-level security, column encryption
Machine Learning Azure Machine Learning Fraud models with explainability Private endpoints, managed identities
Real-Time DB Cosmos DB Global distribution with encryption Multi-region writes, point-in-time restore
Relational DB Azure SQL Database OLTP with TDE Always Encrypted, dynamic masking, auditing
Caching Azure Cache for Redis Session state with SSL VNet injection, geo-replication
Visualization Power BI Premium RLS and OLS enforcement Sensitivity labels, audit logs
HSM Azure Key Vault Premium FIPS 140-2 Level 3 HSMs Managed HSM, key rotation
Data Governance Microsoft Purview Data catalog with lineage Sensitivity scanning, access policies
SIEM Azure Sentinel Security monitoring Threat intelligence, automated playbooks
DLP Microsoft Purview DLP PII and PCI protection Content inspection, policy enforcement
Identity Azure AD Premium P2 PIM, MFA, conditional access Identity protection, access reviews

Compliance Requirements

SOX (Sarbanes-Oxley)

Requirements: - Immutable audit trail of all data changes - Segregation of duties - Change management controls - Access logging and monitoring

Implementation:

# Immutable audit logging with blockchain verification
from azure.storage.blob import BlobServiceClient, ImmutabilityPolicy
from datetime import datetime, timedelta
import hashlib
import json

class SOXAuditLogger:
    def __init__(self, connection_string):
        self.blob_service = BlobServiceClient.from_connection_string(connection_string)
        self.container = self.blob_service.get_container_client("audit-logs")

    def log_data_change(self, user, action, table, record_id, before_value, after_value):
        """Log data changes with immutability and blockchain verification"""

        audit_record = {
            "timestamp": datetime.utcnow().isoformat(),
            "user": user,
            "action": action,
            "table": table,
            "record_id": record_id,
            "before": before_value,
            "after": after_value,
            "ip_address": self.get_client_ip(),
            "session_id": self.get_session_id()
        }

        # Calculate hash for blockchain
        record_json = json.dumps(audit_record, sort_keys=True)
        audit_record["hash"] = hashlib.sha256(record_json.encode()).hexdigest()
        audit_record["previous_hash"] = self.get_last_hash()

        # Store with immutability policy
        blob_name = f"{datetime.utcnow().strftime('%Y/%m/%d/%H')}/{audit_record['hash']}.json"
        blob_client = self.container.get_blob_client(blob_name)

        blob_client.upload_blob(
            json.dumps(audit_record),
            overwrite=False  # Prevent overwrites
        )

        # Set immutability policy (7 years for SOX)
        immutability_policy = ImmutabilityPolicy(
            expiry_time=datetime.utcnow() + timedelta(days=2555),  # 7 years
            policy_mode="Locked"
        )
        blob_client.set_immutability_policy(immutability_policy)

        # Set legal hold for ongoing investigations
        if self.is_under_investigation(record_id):
            blob_client.set_legal_hold(True)

        return audit_record["hash"]

    def verify_audit_chain(self, from_date, to_date):
        """Verify blockchain integrity of audit logs"""
        blobs = self.container.list_blobs()
        previous_hash = None

        for blob in blobs:
            audit_record = json.loads(
                self.container.download_blob(blob.name).readall()
            )

            if previous_hash and audit_record["previous_hash"] != previous_hash:
                raise ValueError(f"Audit chain broken at {blob.name}")

            # Verify hash
            temp_record = audit_record.copy()
            stored_hash = temp_record.pop("hash")
            temp_record.pop("previous_hash")
            calculated_hash = hashlib.sha256(
                json.dumps(temp_record, sort_keys=True).encode()
            ).hexdigest()

            if stored_hash != calculated_hash:
                raise ValueError(f"Hash mismatch in {blob.name}")

            previous_hash = stored_hash

        return True

PCI-DSS (Payment Card Industry)

Requirements: - Encrypt cardholder data at rest and in transit - Tokenization of primary account numbers (PAN) - Network segmentation - Quarterly vulnerability scans

Implementation:

# PCI-DSS compliant card tokenization
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm
from azure.identity import DefaultAzureCredential
import secrets
import hashlib

class PCITokenizationService:
    def __init__(self, vault_url):
        credential = DefaultAzureCredential()
        self.key_client = KeyClient(vault_url, credential)
        self.encryption_key = self.key_client.get_key("card-encryption-key")
        self.crypto_client = CryptographyClient(self.encryption_key, credential)

    def tokenize_pan(self, pan):
        """Tokenize PAN with Format-Preserving Encryption"""

        # Validate PAN
        if not self.luhn_check(pan):
            raise ValueError("Invalid PAN")

        # Generate token
        token = self.generate_token()

        # Encrypt PAN
        encrypted_pan = self.crypto_client.encrypt(
            EncryptionAlgorithm.rsa_oaep_256,
            pan.encode()
        ).ciphertext

        # Store mapping in Key Vault (HSM-protected)
        self.key_client.get_key_vault_secret(
            f"pan-token-{token}",
            value=encrypted_pan.hex(),
            tags={
                "bin": pan[:6],  # Bank Identification Number
                "last4": pan[-4:],
                "token_date": datetime.utcnow().isoformat()
            }
        )

        return {
            "token": token,
            "bin": pan[:6],
            "last4": pan[-4:],
            "expiry_date": self.calculate_expiry()
        }

    def detokenize_pan(self, token):
        """Detokenize PAN (audit logged)"""

        # Log access
        self.log_pan_access(token)

        # Retrieve encrypted PAN
        secret = self.key_client.get_secret(f"pan-token-{token}")
        encrypted_pan = bytes.fromhex(secret.value)

        # Decrypt
        decrypted = self.crypto_client.decrypt(
            EncryptionAlgorithm.rsa_oaep_256,
            encrypted_pan
        ).plaintext

        return decrypted.decode()

    @staticmethod
    def luhn_check(pan):
        """Validate PAN using Luhn algorithm"""
        def digits_of(n):
            return [int(d) for d in str(n)]

        digits = digits_of(pan)
        odd_digits = digits[-1::-2]
        even_digits = digits[-2::-2]
        checksum = sum(odd_digits)
        for d in even_digits:
            checksum += sum(digits_of(d * 2))
        return checksum % 10 == 0

Basel III (Capital Requirements)

Requirements: - Real-time risk-weighted asset (RWA) calculations - Stress testing capabilities - Credit exposure monitoring - Liquidity coverage ratio (LCR) reporting

Implementation:

-- Basel III RWA calculation in Synapse
CREATE VIEW gold.risk_weighted_assets AS
WITH exposure_categories AS (
    SELECT
        exposure_id,
        counterparty_id,
        exposure_amount,
        exposure_type,
        CASE
            -- Sovereign exposures
            WHEN counterparty_type = 'Sovereign' AND credit_rating >= 'AA-' THEN 0.00
            WHEN counterparty_type = 'Sovereign' AND credit_rating >= 'A-' THEN 0.20
            WHEN counterparty_type = 'Sovereign' THEN 1.00

            -- Bank exposures
            WHEN counterparty_type = 'Bank' AND credit_rating >= 'AA-' THEN 0.20
            WHEN counterparty_type = 'Bank' AND credit_rating >= 'A-' THEN 0.50
            WHEN counterparty_type = 'Bank' THEN 1.00

            -- Corporate exposures
            WHEN counterparty_type = 'Corporate' AND credit_rating >= 'AA-' THEN 0.20
            WHEN counterparty_type = 'Corporate' AND credit_rating >= 'A-' THEN 0.50
            WHEN counterparty_type = 'Corporate' AND credit_rating >= 'BBB-' THEN 1.00
            WHEN counterparty_type = 'Corporate' THEN 1.50

            -- Retail exposures
            WHEN exposure_type = 'Retail' THEN 0.75

            -- Residential mortgages
            WHEN exposure_type = 'Residential Mortgage' AND ltv_ratio <= 0.80 THEN 0.35
            WHEN exposure_type = 'Residential Mortgage' THEN 0.75

            -- Commercial real estate
            WHEN exposure_type = 'Commercial RE' THEN 1.00

            ELSE 1.50  -- Default
        END as risk_weight,

        -- Credit Valuation Adjustment (CVA)
        CASE
            WHEN derivative_flag = 1 THEN exposure_amount * 0.075
            ELSE 0
        END as cva_charge

    FROM silver.credit_exposures e
    LEFT JOIN silver.counterparties c ON e.counterparty_id = c.counterparty_id
    LEFT JOIN silver.credit_ratings r ON c.counterparty_id = r.counterparty_id
)
SELECT
    SUM(exposure_amount * risk_weight) as total_rwa,
    SUM(cva_charge) as total_cva,
    SUM(exposure_amount * risk_weight) + SUM(cva_charge) as total_credit_rwa,

    -- Capital requirement (8% of RWA)
    (SUM(exposure_amount * risk_weight) + SUM(cva_charge)) * 0.08 as minimum_capital_requirement,

    -- Capital buffers
    (SUM(exposure_amount * risk_weight) + SUM(cva_charge)) * 0.025 as capital_conservation_buffer,
    (SUM(exposure_amount * risk_weight) + SUM(cva_charge)) * 0.02 as countercyclical_buffer

FROM exposure_categories;

-- Liquidity Coverage Ratio
CREATE VIEW gold.liquidity_coverage_ratio AS
WITH hqla AS (
    SELECT
        SUM(CASE
            WHEN asset_type = 'Cash' THEN market_value * 1.00
            WHEN asset_type = 'Central Bank Reserves' THEN market_value * 1.00
            WHEN asset_type = 'Level 1 Securities' THEN market_value * 1.00
            WHEN asset_type = 'Level 2A Securities' THEN market_value * 0.85
            WHEN asset_type = 'Level 2B Securities' THEN market_value * 0.50
            ELSE 0
        END) as total_hqla
    FROM silver.liquid_assets
    WHERE as_of_date = CURRENT_DATE
),
net_outflows AS (
    SELECT
        SUM(expected_outflow * outflow_rate) - SUM(expected_inflow * inflow_rate) as net_cash_outflow_30d
    FROM silver.cash_flows
    WHERE flow_date BETWEEN CURRENT_DATE AND DATEADD(day, 30, CURRENT_DATE)
)
SELECT
    h.total_hqla,
    n.net_cash_outflow_30d,
    h.total_hqla / NULLIF(n.net_cash_outflow_30d, 0) as lcr_ratio,
    CASE
        WHEN h.total_hqla / NULLIF(n.net_cash_outflow_30d, 0) >= 1.00 THEN 'Compliant'
        ELSE 'Non-Compliant'
    END as compliance_status
FROM hqla h
CROSS JOIN net_outflows n;

GDPR (Privacy Protection)

Requirements: - Right to be forgotten - Data portability - Privacy by design - Consent management

Implementation:

# GDPR compliance implementation
from azure.purview.catalog import PurviewCatalogClient
from azure.identity import DefaultAzureCredential
from datetime import datetime
import json

class GDPRComplianceManager:
    def __init__(self, purview_endpoint, storage_account):
        self.purview_client = PurviewCatalogClient(
            purview_endpoint,
            DefaultAzureCredential()
        )
        self.storage_account = storage_account

    def right_to_be_forgotten(self, customer_id):
        """Erase all customer PII across data estate"""

        # 1. Find all assets containing customer data
        assets = self.purview_client.discovery.query(
            keywords=customer_id,
            filter={
                "classifications": ["PII", "Customer Data"]
            }
        )

        deletion_log = {
            "customer_id": customer_id,
            "request_date": datetime.utcnow().isoformat(),
            "deleted_records": []
        }

        # 2. Delete from each identified location
        for asset in assets:
            if asset["entityType"] == "azure_datalake_gen2_path":
                self.delete_from_data_lake(asset["qualifiedName"], customer_id)
            elif asset["entityType"] == "azure_sql_table":
                self.delete_from_sql(asset["qualifiedName"], customer_id)
            elif asset["entityType"] == "azure_cosmosdb_collection":
                self.delete_from_cosmos(asset["qualifiedName"], customer_id)

            deletion_log["deleted_records"].append({
                "asset": asset["name"],
                "location": asset["qualifiedName"],
                "deleted_at": datetime.utcnow().isoformat()
            })

        # 3. Create immutable deletion certificate
        self.create_deletion_certificate(deletion_log)

        return deletion_log

    def data_portability(self, customer_id, export_format="json"):
        """Export all customer data in machine-readable format"""

        # Find all customer data
        customer_data = {}

        # Profile data
        customer_data["profile"] = self.export_customer_profile(customer_id)

        # Transaction history
        customer_data["transactions"] = self.export_transactions(customer_id)

        # Account information
        customer_data["accounts"] = self.export_accounts(customer_id)

        # Consents
        customer_data["consents"] = self.export_consents(customer_id)

        # Generate export file
        export_file = f"customer-data-{customer_id}-{datetime.utcnow().strftime('%Y%m%d')}.{export_format}"

        if export_format == "json":
            return json.dumps(customer_data, indent=2)
        elif export_format == "xml":
            return self.to_xml(customer_data)
        else:
            raise ValueError(f"Unsupported format: {export_format}")

    def consent_management(self, customer_id, consent_type, action):
        """Manage customer consent with audit trail"""

        consent_record = {
            "customer_id": customer_id,
            "consent_type": consent_type,
            "action": action,  # granted, revoked, updated
            "timestamp": datetime.utcnow().isoformat(),
            "ip_address": self.get_client_ip(),
            "user_agent": self.get_user_agent()
        }

        # Store consent with blockchain verification
        self.store_consent_record(consent_record)

        # Update data processing rules
        if action == "revoked":
            self.revoke_processing_rights(customer_id, consent_type)

        return consent_record

Fraud Detection Use Case

Real-Time Fraud Scoring

# Real-time fraud detection with Azure ML
from azureml.core import Workspace, Model
from azureml.core.webservice import Webservice
import pandas as pd
import numpy as np

class FraudDetectionService:
    def __init__(self, workspace_name, model_name):
        self.ws = Workspace.from_config()
        self.model = Model(self.ws, model_name)
        self.service = Webservice(self.ws, f"{model_name}-service")

    def score_transaction(self, transaction):
        """Score transaction for fraud in real-time (<100ms)"""

        # Feature engineering
        features = self.engineer_features(transaction)

        # Get model prediction
        fraud_probability = self.service.run(input_data=json.dumps({
            "data": [features]
        }))

        # Risk-based decision
        if fraud_probability > 0.95:
            decision = "BLOCK"
            action = "block_transaction"
        elif fraud_probability > 0.75:
            decision = "CHALLENGE"
            action = "request_3ds_authentication"
        elif fraud_probability > 0.50:
            decision = "REVIEW"
            action = "flag_for_manual_review"
        else:
            decision = "APPROVE"
            action = "approve_transaction"

        # Log decision for model monitoring
        self.log_decision({
            "transaction_id": transaction["id"],
            "fraud_score": fraud_probability,
            "decision": decision,
            "latency_ms": self.get_latency()
        })

        return {
            "fraud_score": fraud_probability,
            "decision": decision,
            "action": action,
            "reasons": self.explain_prediction(features, fraud_probability)
        }

    def engineer_features(self, transaction):
        """Real-time feature engineering"""

        # Transaction features
        features = {
            "amount": transaction["amount"],
            "merchant_category": transaction["merchant_category_code"],
            "country": transaction["country_code"],
            "channel": transaction["channel"],
        }

        # Velocity features (from cache)
        customer_id = transaction["customer_id"]
        card_id = transaction["card_id"]

        features["txn_count_24h"] = self.get_txn_count(customer_id, hours=24)
        features["txn_count_1h"] = self.get_txn_count(customer_id, hours=1)
        features["amount_sum_24h"] = self.get_amount_sum(customer_id, hours=24)

        # Card features
        features["card_age_days"] = self.get_card_age(card_id)
        features["international_txn_ratio"] = self.get_international_ratio(card_id)

        # Behavioral features
        features["hour_of_day"] = transaction["timestamp"].hour
        features["day_of_week"] = transaction["timestamp"].weekday()
        features["is_unusual_merchant"] = self.check_unusual_merchant(customer_id, transaction["merchant_id"])
        features["is_unusual_location"] = self.check_unusual_location(customer_id, transaction["location"])

        # Network features
        features["device_reputation"] = self.get_device_reputation(transaction["device_id"])
        features["ip_reputation"] = self.get_ip_reputation(transaction["ip_address"])

        return features

    def explain_prediction(self, features, fraud_score):
        """Generate human-readable fraud indicators"""

        reasons = []

        if features["txn_count_1h"] > 5:
            reasons.append("High velocity: Multiple transactions in short time")

        if features["amount"] > 1000 and features["amount_sum_24h"] / features["amount"] < 2:
            reasons.append("Large transaction: Unusual amount for customer")

        if features["is_unusual_location"]:
            reasons.append("Location anomaly: Transaction from new location")

        if features["device_reputation"] < 0.5:
            reasons.append("Device risk: Suspicious device characteristics")

        if features["international_txn_ratio"] < 0.1 and features["country"] != "US":
            reasons.append("International: Rare international transaction")

        return reasons[:3]  # Top 3 reasons

Fraud Pattern Detection

-- Detect fraud patterns using graph analytics
WITH transaction_network AS (
    SELECT
        t1.transaction_id as txn1,
        t2.transaction_id as txn2,
        t1.card_id,
        t1.merchant_id,
        t1.device_id,
        t1.ip_address,
        ABS(DATEDIFF(second, t1.transaction_time, t2.transaction_time)) as time_diff_seconds
    FROM silver.transactions t1
    JOIN silver.transactions t2
        ON (t1.card_id = t2.card_id
            OR t1.device_id = t2.device_id
            OR t1.ip_address = t2.ip_address)
        AND t1.transaction_id <> t2.transaction_id
        AND t1.transaction_time >= DATEADD(hour, -24, GETDATE())
        AND t2.transaction_time >= DATEADD(hour, -24, GETDATE())
),
suspicious_patterns AS (
    SELECT
        card_id,
        COUNT(DISTINCT merchant_id) as merchant_count,
        COUNT(DISTINCT device_id) as device_count,
        COUNT(DISTINCT ip_address) as ip_count,
        MIN(time_diff_seconds) as min_time_between_txns
    FROM transaction_network
    GROUP BY card_id
    HAVING
        COUNT(DISTINCT device_id) > 3  -- Multiple devices
        OR COUNT(DISTINCT ip_address) > 5  -- Multiple IPs
        OR MIN(time_diff_seconds) < 60  -- Transactions within 1 minute
)
SELECT
    s.*,
    t.transaction_id,
    t.amount,
    t.merchant_name,
    t.country_code,
    f.fraud_label  -- Known fraud cases
FROM suspicious_patterns s
JOIN silver.transactions t ON s.card_id = t.card_id
LEFT JOIN gold.fraud_labels f ON t.transaction_id = f.transaction_id
WHERE t.transaction_time >= DATEADD(hour, -24, GETDATE())
ORDER BY s.device_count DESC, s.ip_count DESC;

Performance & Scaling

High-Frequency Trading Analytics

# Process market data at scale
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("HFT-Analytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "400") \
    .getOrCreate()

# Read tick data (billions of records)
tick_data = (spark
    .read
    .format("delta")
    .load("/silver/market_data/ticks")
    .where(col("trade_date") == current_date())
)

# Calculate VWAP by symbol
vwap_window = Window.partitionBy("symbol").orderBy("timestamp") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

vwap_data = tick_data.withColumn(
    "vwap",
    sum(col("price") * col("volume")).over(vwap_window) /
    sum(col("volume")).over(vwap_window)
)

# Calculate tick imbalance
imbalance_window = Window.partitionBy("symbol").orderBy("timestamp") \
    .rowsBetween(-100, 0)

imbalance_data = vwap_data.withColumn(
    "buy_volume_100",
    sum(when(col("side") == "BUY", col("volume")).otherwise(0)).over(imbalance_window)
).withColumn(
    "sell_volume_100",
    sum(when(col("side") == "SELL", col("volume")).otherwise(0)).over(imbalance_window)
).withColumn(
    "imbalance_ratio",
    col("buy_volume_100") / (col("buy_volume_100") + col("sell_volume_100"))
)

# Write to gold layer with Z-ordering
(imbalance_data
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("trade_date", "exchange")
    .save("/gold/market_analytics/vwap_imbalance")
)

# Optimize table
spark.sql("""
    OPTIMIZE delta.`/gold/market_analytics/vwap_imbalance`
    ZORDER BY (symbol, timestamp)
""")

Disaster Recovery

Multi-Region Failover Strategy

# Multi-region DR with automated failover
from azure.cosmos import CosmosClient, PartitionKey
from azure.mgmt.cosmosdb import CosmosDBManagementClient
from azure.identity import DefaultAzureCredential

class DisasterRecoveryManager:
    def __init__(self, subscription_id, resource_group):
        self.credential = DefaultAzureCredential()
        self.cosmos_mgmt = CosmosDBManagementClient(self.credential, subscription_id)
        self.resource_group = resource_group

    def configure_multi_region(self, account_name):
        """Configure multi-region writes with automatic failover"""

        locations = [
            {"locationName": "East US", "failoverPriority": 0, "isZoneRedundant": True},
            {"locationName": "West US 2", "failoverPriority": 1, "isZoneRedundant": True},
            {"locationName": "UK South", "failoverPriority": 2, "isZoneRedundant": True}
        ]

        self.cosmos_mgmt.database_accounts.begin_create_or_update(
            self.resource_group,
            account_name,
            {
                "location": "East US",
                "locations": locations,
                "enableMultipleWriteLocations": True,
                "enableAutomaticFailover": True,
                "consistencyPolicy": {
                    "defaultConsistencyLevel": "BoundedStaleness",
                    "maxStalenessPrefix": 100,
                    "maxIntervalInSeconds": 5
                },
                "backupPolicy": {
                    "type": "Continuous",
                    "continuousModeProperties": {
                        "tier": "Continuous7Days"
                    }
                }
            }
        )

    def test_failover(self, account_name):
        """Test regional failover"""

        # Trigger manual failover
        self.cosmos_mgmt.database_accounts.begin_failover_priority_change(
            self.resource_group,
            account_name,
            {
                "failoverPolicies": [
                    {"locationName": "West US 2", "failoverPriority": 0},
                    {"locationName": "East US", "failoverPriority": 1},
                    {"locationName": "UK South", "failoverPriority": 2}
                ]
            }
        )

    def point_in_time_restore(self, account_name, restore_timestamp):
        """Restore account to specific point in time"""

        restore_account_name = f"{account_name}-restored"

        self.cosmos_mgmt.database_accounts.begin_create_or_update(
            self.resource_group,
            restore_account_name,
            {
                "location": "East US",
                "createMode": "Restore",
                "restoreParameters": {
                    "restoreMode": "PointInTime",
                    "restoreSource": f"/subscriptions/{subscription_id}/resourceGroups/{self.resource_group}/providers/Microsoft.DocumentDB/databaseAccounts/{account_name}",
                    "restoreTimestampInUtc": restore_timestamp
                }
            }
        )

Cost Optimization

Financial Services Workload Costs

Component Configuration Monthly Cost Optimization Strategy
Event Hubs Premium 8 PUs, 90-day retention $10,800 Use Standard tier for non-critical streams
Synapse Dedicated Pool DW500c, 8 hours/day $3,600 Auto-pause, use serverless for ad-hoc
Databricks Premium 10-node cluster, 12 hours/day $12,000 Use spot instances, optimize cluster size
Cosmos DB 50K RU/s, multi-region $29,200 Use autoscale, optimize partition keys
SQL Database Business Critical, 16 vCores $9,600 Use General Purpose for non-prod
Key Vault Premium HSM-backed keys $5 per key/month Consolidate keys where possible
Purview 10 vCore hours/day $1,200 Schedule scans during off-peak
Total ~$66,000/month

Deployment

Bicep Template for Financial Services

// financial-services-infrastructure.bicep

param location string = resourceGroup().location
param environment string = 'prod'
param vnetAddressPrefix string = '10.0.0.0/16'

// Virtual Network with NSGs
resource vnet 'Microsoft.Network/virtualNetworks@2021-05-01' = {
  name: 'finservices-vnet-${environment}'
  location: location
  properties: {
    addressSpace: {
      addressPrefixes: [vnetAddressPrefix]
    }
    subnets: [
      {
        name: 'synapse-subnet'
        properties: {
          addressPrefix: '10.0.1.0/24'
          privateEndpointNetworkPolicies: 'Disabled'
          privateLinkServiceNetworkPolicies: 'Disabled'
        }
      }
      {
        name: 'databricks-private-subnet'
        properties: {
          addressPrefix: '10.0.2.0/24'
          delegations: [
            {
              name: 'databricks-delegation'
              properties: {
                serviceName: 'Microsoft.Databricks/workspaces'
              }
            }
          ]
        }
      }
    ]
  }
}

// Data Lake with encryption and immutability
resource dataLake 'Microsoft.Storage/storageAccounts@2021-09-01' = {
  name: 'findata${environment}${uniqueString(resourceGroup().id)}'
  location: location
  kind: 'StorageV2'
  sku: {
    name: 'Standard_GRS'  // Geo-redundant
  }
  properties: {
    isHnsEnabled: true
    minimumTlsVersion: 'TLS1_2'
    supportsHttpsTrafficOnly: true
    encryption: {
      requireInfrastructureEncryption: true
      services: {
        blob: {
          enabled: true
          keyType: 'Account'
        }
      }
      keySource: 'Microsoft.Keyvault'
      keyvaultproperties: {
        keyname: 'storage-encryption-key'
        keyvaulturi: keyVault.properties.vaultUri
      }
    }
    immutableStorageWithVersioning: {
      enabled: true
      immutabilityPolicy: {
        allowProtectedAppendWrites: false
        immutabilityPeriodSinceCreationInDays: 2555  // 7 years for SOX
      }
    }
  }
}

// Key Vault Premium with HSM
resource keyVault 'Microsoft.KeyVault/vaults@2021-10-01' = {
  name: 'finkeyvault-${environment}'
  location: location
  properties: {
    sku: {
      family: 'A'
      name: 'premium'  // HSM-backed
    }
    tenantId: subscription().tenantId
    enabledForDeployment: true
    enabledForDiskEncryption: true
    enabledForTemplateDeployment: true
    enableSoftDelete: true
    softDeleteRetentionInDays: 90
    enablePurgeProtection: true
    enableRbacAuthorization: true
    networkAcls: {
      defaultAction: 'Deny'
      bypass: 'AzureServices'
      virtualNetworkRules: [
        {
          id: '${vnet.id}/subnets/synapse-subnet'
        }
      ]
    }
  }
}

// Synapse with Managed VNet
resource synapseWorkspace 'Microsoft.Synapse/workspaces@2021-06-01' = {
  name: 'finsynapse-${environment}'
  location: location
  identity: {
    type: 'SystemAssigned'
  }
  properties: {
    defaultDataLakeStorage: {
      accountUrl: dataLake.properties.primaryEndpoints.dfs
      filesystem: 'synapse'
    }
    managedVirtualNetwork: 'default'
    managedVirtualNetworkSettings: {
      preventDataExfiltration: true
      allowedAadTenantIdsForLinking: [subscription().tenantId]
    }
    encryption: {
      cmk: {
        key: {
          name: 'synapse-encryption-key'
          keyVaultUrl: keyVault.properties.vaultUri
        }
      }
    }
    sqlAdministratorLogin: 'sqladmin'
    sqlAdministratorLoginPassword: keyVault.getSecret('sql-admin-password')
  }
}

// Cosmos DB with multi-region writes
resource cosmosDb 'Microsoft.DocumentDB/databaseAccounts@2022-05-15' = {
  name: 'fincos mos-${environment}'
  location: location
  kind: 'GlobalDocumentDB'
  properties: {
    databaseAccountOfferType: 'Standard'
    consistencyPolicy: {
      defaultConsistencyLevel: 'BoundedStaleness'
      maxStalenessPrefix: 100
      maxIntervalInSeconds: 5
    }
    enableMultipleWriteLocations: true
    enableAutomaticFailover: true
    locations: [
      {locationName: 'East US', failoverPriority: 0, isZoneRedundant: true}
      {locationName: 'West US 2', failoverPriority: 1, isZoneRedundant: true}
    ]
    backupPolicy: {
      type: 'Continuous'
      continuousModeProperties: {
        tier: 'Continuous7Days'
      }
    }
    networkAclBypass: 'None'
    publicNetworkAccess: 'Disabled'
  }
}

// Azure Sentinel for SIEM
resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2021-06-01' = {
  name: 'finlogs-${environment}'
  location: location
  properties: {
    sku: {
      name: 'PerGB2018'
    }
    retentionInDays: 730  // 2 years for compliance
  }
}

resource sentinel 'Microsoft.SecurityInsights/onboardingStates@2021-10-01' = {
  name: 'default'
  scope: logAnalytics
  properties: {}
}

Internal Documentation

External References


Next Steps

  1. Review Healthcare Analytics Architecture
  2. Explore Enterprise Data Warehouse
  3. Implement ML Pipeline Architecture