Schema Drift Detection

Lesson 13 — Defensive data validation

Why Schema Drift Matters

Government data sources change column names, add or remove columns, and retype columns between releases — often without notice. If your pipeline assumes a fixed schema, it silently breaks or loads garbage. A renamed column becomes all NULLs; a retyped column truncates values or throws cast errors deep in the warehouse layer where the root cause is hard to trace.

The pipeline uses proactive drift detection to catch these changes before they reach the warehouse. Every dataset load compares the incoming schema against the last known-good schema and flags any differences. This turns silent corruption into a loud, actionable failure at the earliest possible point.

Detecting Column Changes

The core detection function compares two schema snapshots — each a dict[str, str] mapping column name to data type — and returns a list of SchemaChange objects classified as "added", "removed", or "retyped".

def detect_schema_drift(schema_a, schema_b):
    """Compare two schema snapshots and return a list of changes."""
    changes = []
    all_cols = set(schema_a.keys()) | set(schema_b.keys())
    for col in sorted(all_cols):
        if col in schema_a and col not in schema_b:
            changes.append(SchemaChange("removed", col))
        elif col not in schema_a and col in schema_b:
            changes.append(SchemaChange("added", col))
        elif schema_a[col].upper() != schema_b[col].upper():
            changes.append(SchemaChange("retyped", col))
    return changes

The comparison is case-insensitive on type names because some sources report VARCHAR while the staging layer normalizes to varchar. An empty change list means the schema is stable — the most common outcome within a single source release cycle.

Row Count Shift Detection

Schema drift catches structural changes, but a source can also change in volume without altering its schema. A 20% threshold on row count shift catches major structural changes — such as BLS adding a new geography level or dropping an occupation group — while ignoring normal year-to-year variation.

ROW_COUNT_SHIFT_THRESHOLD_PCT = 20.0

def detect_row_count_shift(prior_count, current_count, threshold_pct=20.0):
    """Flag row count changes that exceed the threshold percentage."""
    pct_change = abs(current_count - prior_count) / prior_count * 100
    return ValidationResult(
        passed=pct_change < threshold_pct,
        message=f"Row count changed {pct_change:.1f}%"
    )

For example, OEWS national data has roughly 830 occupations per year. If a new release suddenly reports 1,000 rows, the 20% threshold fires. This could mean BLS added a new occupation classification level — something the parser needs to handle explicitly rather than silently ingesting.

Measure Delta Detection

Beyond row counts, the pipeline checks for large relative changes in key measures. This catches wage corrections, employment revisions, and other data-level changes that don't affect the schema or row count but would produce misleading trend analysis if ingested without review.

CheckThresholdCatches
Row count shift 20% New geography levels, dropped occupation groups
Measure delta 15% Wage corrections, employment revisions
Schema drift Any change Column renames, type changes, removed fields

The measure delta check identifies the top-N measures with the largest relative change between releases. A 15% swing in mean annual wage for a major occupation group almost certainly indicates a methodology change or data error rather than a real labor market shift.

Publication Gates

All validation checks feed into a single publication gate that decides whether data flows from staging into the warehouse. The gate is deliberately simple: pass everything or block everything.

def check_publication_gate(validation_results):
    """Binary gate: all validations must pass for data to flow."""
    failures = [r for r in validation_results if not r.passed]
    if failures:
        return f"BLOCKED: {len(failures)} validation(s) failed"
    return "ALLOWED: All validations passed"

Key principle: The gate is binary — either all validations pass and data flows to the warehouse, or any failure blocks publication. Raw data is always preserved regardless — you can fix the parser and re-run without re-downloading.

This fail-fast approach means schema drift never silently corrupts the warehouse. When a validation fails, the pipeline logs exactly which check failed and why, giving the developer a clear starting point for investigation. The raw data remains in the landing layer, so once the parser is updated to handle the new schema, a simple re-run loads everything correctly.