Skip to content

Metadata Governance

Home | Best Practices | Cross-Cutting | Metadata Governance

Status Category

Best practices for metadata management and governance in Azure data platforms.


Overview

Metadata governance ensures data assets are discoverable, understandable, and properly managed across your analytics platform.


Metadata Categories

Technical Metadata

Category Examples Storage
Schema Column names, types, constraints Unity Catalog / Purview
Lineage Source-to-target mappings Azure Purview
Statistics Row counts, null percentages Delta Lake
Partitions Partition keys, file counts Hive Metastore

Business Metadata

Category Examples Storage
Glossary Business terms, definitions Azure Purview
Classification PII, Confidential, Public Azure Purview
Ownership Data steward, technical owner Azure Purview
Quality Data quality scores Custom registry

Azure Purview Integration

Data Source Registration

from azure.purview.catalog import PurviewCatalogClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
client = PurviewCatalogClient(
    endpoint="https://purview-account.purview.azure.com",
    credential=credential
)

# Register ADLS Gen2 source
def register_data_lake():
    entity = {
        "typeName": "azure_datalake_gen2_resource_set",
        "attributes": {
            "name": "sales_bronze_data",
            "qualifiedName": "https://datalake.dfs.core.windows.net/bronze/sales",
            "description": "Raw sales data from operational systems",
            "owner": "data-engineering@company.com"
        },
        "classifications": [
            {"typeName": "MICROSOFT.PERSONAL.DATA"}
        ]
    }
    client.entity.create_or_update(entity={"entity": entity})

Business Glossary

# Create glossary term
def create_glossary_term(name: str, definition: str, domain: str):
    term = {
        "name": name,
        "longDescription": definition,
        "anchor": {"glossaryGuid": get_glossary_guid()},
        "resources": [{"displayName": domain, "url": f"#/{domain}"}],
        "status": "Approved"
    }
    return client.glossary.create_glossary_term(term)

# Example terms
create_glossary_term(
    name="Customer Lifetime Value",
    definition="Total revenue expected from a customer over entire relationship",
    domain="Sales"
)

Unity Catalog Integration

Catalog Structure

-- Create catalog and schema hierarchy
CREATE CATALOG IF NOT EXISTS enterprise_data;

CREATE SCHEMA IF NOT EXISTS enterprise_data.sales
COMMENT 'Sales domain data assets';

CREATE SCHEMA IF NOT EXISTS enterprise_data.finance
COMMENT 'Finance domain data assets';

-- Register table with metadata
CREATE TABLE enterprise_data.sales.customers (
    customer_id STRING NOT NULL COMMENT 'Unique customer identifier',
    email STRING COMMENT 'Customer email address - PII',
    created_date DATE COMMENT 'Account creation date'
)
COMMENT 'Master customer dimension table'
TBLPROPERTIES (
    'data_classification' = 'confidential',
    'data_owner' = 'sales-team@company.com',
    'retention_days' = '2555'
);

-- Add column tags
ALTER TABLE enterprise_data.sales.customers
ALTER COLUMN email SET TAGS ('pii', 'gdpr_relevant');

Data Lineage

# Track lineage in Unity Catalog
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Query lineage
lineage = w.data_lineage.get_lineage(
    table_name="enterprise_data.sales.customer_360"
)

for upstream in lineage.upstream_tables:
    print(f"Source: {upstream.name}")

for downstream in lineage.downstream_tables:
    print(f"Dependent: {downstream.name}")

Metadata Registry

Custom Registry Schema

-- Metadata registry table
CREATE TABLE governance.metadata_registry (
    table_id STRING NOT NULL,
    catalog STRING NOT NULL,
    schema_name STRING NOT NULL,
    table_name STRING NOT NULL,

    -- Technical metadata
    row_count BIGINT,
    size_bytes BIGINT,
    partition_columns ARRAY<STRING>,
    last_modified TIMESTAMP,

    -- Business metadata
    domain STRING,
    data_owner STRING,
    data_steward STRING,
    classification STRING,
    retention_policy STRING,

    -- Quality metadata
    quality_score DECIMAL(5,2),
    completeness_score DECIMAL(5,2),
    freshness_hours INT,

    -- Audit
    registered_at TIMESTAMP,
    updated_at TIMESTAMP
);

-- Auto-populate from information schema
INSERT INTO governance.metadata_registry
SELECT
    CONCAT(table_catalog, '.', table_schema, '.', table_name) AS table_id,
    table_catalog AS catalog,
    table_schema AS schema_name,
    table_name,
    NULL AS row_count,
    NULL AS size_bytes,
    NULL AS partition_columns,
    NULL AS last_modified,
    NULL AS domain,
    NULL AS data_owner,
    NULL AS data_steward,
    NULL AS classification,
    NULL AS retention_policy,
    NULL AS quality_score,
    NULL AS completeness_score,
    NULL AS freshness_hours,
    current_timestamp() AS registered_at,
    current_timestamp() AS updated_at
FROM system.information_schema.tables
WHERE table_catalog = 'enterprise_data';

Automated Metadata Collection

Statistics Collection

def collect_table_metadata(table_path: str) -> dict:
    """Collect comprehensive table metadata."""
    df = spark.read.format("delta").load(table_path)
    delta_table = DeltaTable.forPath(spark, table_path)

    # Basic statistics
    stats = {
        "row_count": df.count(),
        "column_count": len(df.columns),
        "size_bytes": get_table_size(table_path)
    }

    # Schema info
    stats["columns"] = [
        {
            "name": field.name,
            "type": str(field.dataType),
            "nullable": field.nullable
        }
        for field in df.schema.fields
    ]

    # Delta-specific metadata
    history = delta_table.history(1).collect()[0]
    stats["last_operation"] = history.operation
    stats["last_modified"] = history.timestamp
    stats["version"] = history.version

    return stats

Scheduled Metadata Refresh

# Databricks job for metadata refresh
def refresh_metadata_registry():
    """Daily job to refresh metadata registry."""
    tables = spark.sql("""
        SELECT table_id, catalog, schema_name, table_name
        FROM governance.metadata_registry
    """).collect()

    for table in tables:
        table_path = f"{table.catalog}.{table.schema_name}.{table.table_name}"
        try:
            metadata = collect_table_metadata(table_path)
            update_registry(table.table_id, metadata)
        except Exception as e:
            log_error(f"Failed to refresh {table_path}: {e}")

Data Discovery

Search Interface

def search_data_assets(query: str, filters: dict = None) -> list:
    """Search data assets by keyword and filters."""
    base_query = """
        SELECT
            table_id,
            table_name,
            domain,
            data_owner,
            classification,
            quality_score
        FROM governance.metadata_registry
        WHERE 1=1
    """

    if query:
        base_query += f"""
            AND (
                LOWER(table_name) LIKE '%{query.lower()}%'
                OR LOWER(domain) LIKE '%{query.lower()}%'
            )
        """

    if filters:
        if filters.get("domain"):
            base_query += f" AND domain = '{filters['domain']}'"
        if filters.get("classification"):
            base_query += f" AND classification = '{filters['classification']}'"

    return spark.sql(base_query).collect()


Last Updated: January 2025