The job finished in four minutes. No exceptions. No warnings. The Spark UI was clean.

Two weeks later, someone noticed that two reports were telling different stories about the same business metric. One team was working off bad numbers. Nobody knew for how long.

The pipeline hadn’t crashed. It had just quietly done the wrong thing — twice, in two different ways.


What Actually Happened

The setup was straightforward: a CSV file acting as a master reference, and one JSON file per entity carrying supplementary business data. Both sources shared a key business date. Simple enough.

Two bugs were hiding in plain sight.

Bug #1 — Ghost records. Some JSON files referenced UUIDs that didn’t exist in the CSV. No match, no error, no row in the output. The pipeline used a join, and the join silently discarded them. Nobody configured it to complain about unmatched records.

Bug #2 — Diverging dates. The same business date appeared in both the CSV and the JSON. They were supposed to be identical. Sometimes they weren’t. Downstream transformations picked one or the other depending on which table they queried. Half the reports were right. Half weren’t.

Both bugs had the same root cause: the pipeline trusted its inputs completely.


The Real Problem: You’re Trusting Sources You Shouldn’t

A Delta schema will tell you a column is a StringType. It won’t tell you the string is a valid UUID. It won’t tell you that UUID exists anywhere else in your system. And it certainly won’t tell you that a date in file A matches the same date in file B.

Schema enforcement is not data validation. It’s type enforcement. There’s a gap between “this column contains strings” and “these strings are meaningful, consistent, and referentially sound” — and that gap is where your pipeline will silently corrupt your outputs.

The gate isn’t at the transformation layer. It needs to be at the entry point.


Validate at the Gate

Here’s the pattern I now use on pipelines with multiple input sources. The principle: parse and validate before you transform. If the inputs don’t pass, the job raises. No partial loads, no silent data loss.

Step 1 — Model your expected inputs with Pydantic

from pydantic import BaseModel, field_validator
from uuid import UUID
from datetime import date


class MasterRecord(BaseModel):
    entity_id: UUID
    business_date: date
    # ... other fields


class EntityPayload(BaseModel):
    entity_id: UUID
    business_date: date
    # ... other fields

Pydantic handles type coercion and raises immediately on malformed data. A string that doesn’t parse as a UUID fails here — not three steps later in a Spark join that silently drops the row.

Step 2 — Validate referential integrity before the join

def validate_referential_integrity(
    master_ids: set[str],
    payloads: list[EntityPayload],
) -> None:
    ghost_ids = {
        str(p.entity_id)
        for p in payloads
        if str(p.entity_id) not in master_ids
    }

    if ghost_ids:
        raise ValueError(
            f"Referential integrity violation: {len(ghost_ids)} entity ID(s) "
            f"in JSON payloads not found in master CSV.\n"
            f"Unknown IDs: {ghost_ids}"
        )

No ghost records reach the join. The pipeline tells you exactly what’s missing, and stops.

Step 3 — Validate cross-source consistency

def validate_date_consistency(
    master_records: list[MasterRecord],
    payloads: list[EntityPayload],
) -> None:
    payload_map = {str(p.entity_id): p for p in payloads}
    conflicts = []

    for record in master_records:
        entity_id = str(record.entity_id)
        if entity_id not in payload_map:
            continue  # already caught by referential integrity check

        payload = payload_map[entity_id]
        if record.business_date != payload.business_date:
            conflicts.append(
                f"  {entity_id}: CSV={record.business_date}, "
                f"JSON={payload.business_date}"
            )

    if conflicts:
        raise ValueError(
            f"Cross-source date inconsistency detected "
            f"({len(conflicts)} record(s)):\n" + "\n".join(conflicts)
        )

Step 4 — Wire it into your pipeline entry point

def run_pipeline(csv_path: str, json_dir: str) -> None:
    # Load and parse
    raw_master = load_csv(csv_path)
    raw_payloads = load_json_files(json_dir)

    master_records = [MasterRecord(**row) for row in raw_master]
    payloads = [EntityPayload(**data) for data in raw_payloads]

    # Validate before doing anything else
    master_ids = {str(r.entity_id) for r in master_records}
    validate_referential_integrity(master_ids, payloads)
    validate_date_consistency(master_records, payloads)

    # Only now do we hand off to Spark
    spark_transform(master_records, payloads)

The validation layer runs in pure Python, before Spark does anything. It’s fast, it’s readable, and it fails with a message that tells you exactly what’s wrong.


No Partial Loads. No Exceptions.

Some teams prefer a “log and continue” approach — record the bad rows, skip them, keep processing. I understand the appeal. The job finishes. You get something.

Here’s the problem: you lose the ability to reason about your outputs. If a downstream report is wrong, you need to know whether the source data was clean. “We skipped 47 rows due to validation errors” buried in a log somewhere is not a guarantee. It’s a liability.

Reject the batch. Fix the source. Reload clean.

That’s the contract your pipeline should make with its consumers. The alternative is a system where “success” doesn’t mean what it should.


What This Would Have Caught

With this gate in place:

  • The ghost UUIDs raise a ValueError on the first run. The data team investigates the source system. The upstream bug gets fixed.
  • The diverging dates raise a ValueError with the exact entity IDs and both conflicting values. One source is authoritative. You pick it explicitly — not by accident based on join order.

Neither bug reaches the transformation layer. Neither bug reaches a report.

The pipeline runs in four minutes and the data is right. That’s what success should look like.


Cross-reference

If you’re not already using Pydantic for your pipeline configuration — separate concern, same principle — this article covers that pattern.