Skip to content

Edge and Offline Handling

Home | Best Practices | Operational Excellence | Edge Offline Handling

Status Category

Best practices for handling disconnected and edge scenarios in analytics pipelines.


Overview

Edge computing and offline scenarios require special handling to ensure data integrity and eventual consistency when connectivity is intermittent.


Architecture Patterns

Store and Forward

```text┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Edge │ │ Gateway │ │ Cloud │ │ Device │────▶│ (Buffer) │────▶│ Ingest │ │ │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ Local Store Queue/Buffer Event Hubs

### Implementation

```python
import sqlite3
import json
from datetime import datetime
from queue import Queue
from threading import Thread

class OfflineDataHandler:
    """Handle data collection during offline periods."""

    def __init__(self, db_path: str = "offline_buffer.db"):
        self.db_path = db_path
        self.online = False
        self.upload_queue = Queue()
        self._init_db()
        self._start_sync_thread()

    def _init_db(self):
        """Initialize local SQLite buffer."""
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS offline_buffer (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                event_type TEXT NOT NULL,
                payload TEXT NOT NULL,
                synced INTEGER DEFAULT 0
            )
        """)
        conn.commit()
        conn.close()

    def store_event(self, event_type: str, payload: dict):
        """Store event locally."""
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "INSERT INTO offline_buffer (timestamp, event_type, payload) VALUES (?, ?, ?)",
            (datetime.utcnow().isoformat(), event_type, json.dumps(payload))
        )
        conn.commit()
        conn.close()

        # If online, queue for immediate upload
        if self.online:
            self.upload_queue.put((event_type, payload))

    def sync_pending(self):
        """Sync all pending events to cloud."""
        conn = sqlite3.connect(self.db_path)
        pending = conn.execute(
            "SELECT id, event_type, payload FROM offline_buffer WHERE synced = 0 ORDER BY id"
        ).fetchall()

        for id, event_type, payload in pending:
            try:
                self._upload_event(event_type, json.loads(payload))
                conn.execute("UPDATE offline_buffer SET synced = 1 WHERE id = ?", (id,))
                conn.commit()
            except Exception as e:
                print(f"Sync failed for event {id}: {e}")
                break

        conn.close()


Azure IoT Edge Integration

Edge Module Configuration

{
    "modulesContent": {
        "$edgeAgent": {
            "properties.desired": {
                "modules": {
                    "DataCollector": {
                        "type": "docker",
                        "settings": {
                            "image": "myregistry.azurecr.io/data-collector:1.0",
                            "createOptions": {
                                "HostConfig": {
                                    "Binds": ["/data:/app/data"]
                                }
                            }
                        },
                        "env": {
                            "BUFFER_PATH": {"value": "/app/data/buffer"},
                            "MAX_BUFFER_SIZE_MB": {"value": "500"},
                            "SYNC_INTERVAL_SECONDS": {"value": "60"}
                        }
                    }
                }
            }
        },
        "$edgeHub": {
            "properties.desired": {
                "routes": {
                    "DataToCloud": "FROM /messages/modules/DataCollector/* INTO $upstream",
                    "DataToLocal": "FROM /messages/modules/DataCollector/* INTO BrokeredEndpoint(\"/modules/LocalStore/inputs/data\")"
                },
                "storeAndForwardConfiguration": {
                    "timeToLiveSecs": 7200
                }
            }
        }
    }
}

Offline Detection

import asyncio
from azure.iot.device.aio import IoTHubModuleClient

class EdgeConnectivityManager:
    """Manage edge connectivity state."""

    def __init__(self):
        self.client = None
        self.is_connected = False
        self.offline_handler = OfflineDataHandler()

    async def connect(self):
        """Initialize IoT Edge module client."""
        self.client = IoTHubModuleClient.create_from_edge_environment()
        self.client.on_connection_state_change = self._on_connection_change
        await self.client.connect()

    def _on_connection_change(self, connected: bool):
        """Handle connectivity state changes."""
        self.is_connected = connected
        if connected:
            print("Connected - syncing pending data")
            asyncio.create_task(self._sync_offline_data())
        else:
            print("Disconnected - switching to offline mode")

    async def send_telemetry(self, data: dict):
        """Send telemetry with offline fallback."""
        if self.is_connected:
            try:
                await self.client.send_message(json.dumps(data))
            except Exception:
                self.offline_handler.store_event("telemetry", data)
        else:
            self.offline_handler.store_event("telemetry", data)

Data Integrity

Idempotent Processing

from hashlib import sha256

class IdempotentEventProcessor:
    """Ensure events are processed exactly once."""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl_seconds = 86400 * 7  # 7 days

    def generate_event_id(self, event: dict) -> str:
        """Generate deterministic event ID."""
        # Use business key fields for ID
        key_fields = sorted([
            event.get("device_id", ""),
            event.get("timestamp", ""),
            event.get("event_type", "")
        ])
        return sha256("|".join(key_fields).encode()).hexdigest()

    def should_process(self, event: dict) -> bool:
        """Check if event should be processed."""
        event_id = self.generate_event_id(event)

        # Try to set key with NX (only if not exists)
        result = self.redis.set(
            f"processed:{event_id}",
            "1",
            nx=True,
            ex=self.ttl_seconds
        )

        return result is not None

Conflict Resolution

class ConflictResolver:
    """Resolve conflicts in offline data sync."""

    STRATEGIES = {
        "last_write_wins": lambda old, new: new,
        "first_write_wins": lambda old, new: old,
        "merge": lambda old, new: {**old, **new},
        "version_check": None  # Custom logic
    }

    def resolve(self, old_record: dict, new_record: dict, strategy: str = "last_write_wins"):
        """Resolve conflict between records."""
        if strategy == "version_check":
            return self._version_based_resolution(old_record, new_record)

        resolver = self.STRATEGIES.get(strategy, self.STRATEGIES["last_write_wins"])
        return resolver(old_record, new_record)

    def _version_based_resolution(self, old: dict, new: dict) -> dict:
        """Resolve based on version vectors."""
        old_version = old.get("_version", 0)
        new_version = new.get("_version", 0)

        if new_version > old_version:
            return new
        elif new_version < old_version:
            return old
        else:
            # Same version - use timestamp
            if new.get("_modified", "") > old.get("_modified", ""):
                return new
            return old

Buffering Strategies

Memory-Efficient Buffer

from collections import deque
import os

class HybridBuffer:
    """Memory buffer with disk overflow."""

    def __init__(self, memory_limit: int = 1000, disk_path: str = "/tmp/buffer"):
        self.memory_limit = memory_limit
        self.disk_path = disk_path
        self.memory_buffer = deque(maxlen=memory_limit)
        self.disk_overflow = False
        os.makedirs(disk_path, exist_ok=True)

    def append(self, item: dict):
        """Add item to buffer."""
        if len(self.memory_buffer) >= self.memory_limit:
            self._overflow_to_disk()

        self.memory_buffer.append(item)

    def _overflow_to_disk(self):
        """Move oldest items to disk."""
        overflow_count = len(self.memory_buffer) // 2
        overflow_items = [self.memory_buffer.popleft() for _ in range(overflow_count)]

        filename = f"{self.disk_path}/overflow_{datetime.utcnow().timestamp()}.json"
        with open(filename, 'w') as f:
            json.dump(overflow_items, f)

        self.disk_overflow = True

    def flush(self):
        """Get all buffered items."""
        items = list(self.memory_buffer)
        self.memory_buffer.clear()

        # Include disk overflow
        if self.disk_overflow:
            for filename in sorted(os.listdir(self.disk_path)):
                filepath = os.path.join(self.disk_path, filename)
                with open(filepath, 'r') as f:
                    items.extend(json.load(f))
                os.remove(filepath)
            self.disk_overflow = False

        return items

Monitoring

Offline Metrics

// Monitor offline duration and data loss
OfflineEvents
| where EventType == "connectivity_change"
| extend PreviousState = prev(ConnectionState, 1)
| where ConnectionState == "online" and PreviousState == "offline"
| extend OfflineDuration = datetime_diff('minute', Timestamp, prev(Timestamp, 1))
| summarize
    OfflineCount = count(),
    AvgOfflineMinutes = avg(OfflineDuration),
    MaxOfflineMinutes = max(OfflineDuration),
    TotalOfflineMinutes = sum(OfflineDuration)
    by DeviceId, bin(Timestamp, 1d)


Last Updated: January 2025