The job finished in four minutes. No exceptions. No warnings. The Spark UI was clean.
Two weeks later, someone noticed that two reports were telling different stories about the same business metric. One team was working off bad numbers. Nobody knew for how long.
The pipeline hadn’t crashed. It had just quietly done the wrong thing — twice, in two different ways.
What Actually Happened
The setup was straightforward: a CSV file acting as a master reference, and one JSON file per entity carrying supplementary business data. Both sources shared a key business date. Simple enough.
Two bugs were hiding in plain sight.
Bug #1 — Ghost records. Some JSON files referenced UUIDs that didn’t exist in the CSV. No match, no error, no row in the output. The pipeline used a join, and the join silently discarded them. Nobody configured it to complain about unmatched records.
Bug #2 — Diverging dates. The same business date appeared in both the CSV and the JSON. They were supposed to be identical. Sometimes they weren’t. Downstream transformations picked one or the other depending on which table they queried. Half the reports were right. Half weren’t.
Both bugs had the same root cause: the pipeline trusted its inputs completely.
The Real Problem: You’re Trusting Sources You Shouldn’t
A Delta schema will tell you a column is a StringType. It won’t tell you the string is a valid UUID. It won’t tell you that UUID exists anywhere else in your system. And it certainly won’t tell you that a date in file A matches the same date in file B.
Schema enforcement is not data validation. It’s type enforcement. There’s a gap between “this column contains strings” and “these strings are meaningful, consistent, and referentially sound” — and that gap is where your pipeline will silently corrupt your outputs.
The gate isn’t at the transformation layer. It needs to be at the entry point.
Validate at the Gate
Here’s the pattern I now use on pipelines with multiple input sources. The principle: parse and validate before you transform. If the inputs don’t pass, the job raises. No partial loads, no silent data loss.
Step 1 — Model your expected inputs with Pydantic
from pydantic import BaseModel, field_validator
from uuid import UUID
from datetime import date
class MasterRecord(BaseModel):
entity_id: UUID
business_date: date
# ... other fields
class EntityPayload(BaseModel):
entity_id: UUID
business_date: date
# ... other fields
Pydantic handles type coercion and raises immediately on malformed data. A string that doesn’t parse as a UUID fails here — not three steps later in a Spark join that silently drops the row.
Step 2 — Validate referential integrity before the join
def validate_referential_integrity(
master_ids: set[str],
payloads: list[EntityPayload],
) -> None:
ghost_ids = {
str(p.entity_id)
for p in payloads
if str(p.entity_id) not in master_ids
}
if ghost_ids:
raise ValueError(
f"Referential integrity violation: {len(ghost_ids)} entity ID(s) "
f"in JSON payloads not found in master CSV.\n"
f"Unknown IDs: {ghost_ids}"
)
No ghost records reach the join. The pipeline tells you exactly what’s missing, and stops.
Step 3 — Validate cross-source consistency
def validate_date_consistency(
master_records: list[MasterRecord],
payloads: list[EntityPayload],
) -> None:
payload_map = {str(p.entity_id): p for p in payloads}
conflicts = []
for record in master_records:
entity_id = str(record.entity_id)
if entity_id not in payload_map:
continue # already caught by referential integrity check
payload = payload_map[entity_id]
if record.business_date != payload.business_date:
conflicts.append(
f" {entity_id}: CSV={record.business_date}, "
f"JSON={payload.business_date}"
)
if conflicts:
raise ValueError(
f"Cross-source date inconsistency detected "
f"({len(conflicts)} record(s)):\n" + "\n".join(conflicts)
)
Step 4 — Wire it into your pipeline entry point
def run_pipeline(csv_path: str, json_dir: str) -> None:
# Load and parse
raw_master = load_csv(csv_path)
raw_payloads = load_json_files(json_dir)
master_records = [MasterRecord(**row) for row in raw_master]
payloads = [EntityPayload(**data) for data in raw_payloads]
# Validate before doing anything else
master_ids = {str(r.entity_id) for r in master_records}
validate_referential_integrity(master_ids, payloads)
validate_date_consistency(master_records, payloads)
# Only now do we hand off to Spark
spark_transform(master_records, payloads)
The validation layer runs in pure Python, before Spark does anything. It’s fast, it’s readable, and it fails with a message that tells you exactly what’s wrong.
No Partial Loads. No Exceptions.
Some teams prefer a “log and continue” approach — record the bad rows, skip them, keep processing. I understand the appeal. The job finishes. You get something.
Here’s the problem: you lose the ability to reason about your outputs. If a downstream report is wrong, you need to know whether the source data was clean. “We skipped 47 rows due to validation errors” buried in a log somewhere is not a guarantee. It’s a liability.
Reject the batch. Fix the source. Reload clean.
That’s the contract your pipeline should make with its consumers. The alternative is a system where “success” doesn’t mean what it should.
What This Would Have Caught
With this gate in place:
- The ghost UUIDs raise a
ValueErroron the first run. The data team investigates the source system. The upstream bug gets fixed. - The diverging dates raise a
ValueErrorwith the exact entity IDs and both conflicting values. One source is authoritative. You pick it explicitly — not by accident based on join order.
Neither bug reaches the transformation layer. Neither bug reaches a report.
The pipeline runs in four minutes and the data is right. That’s what success should look like.
Cross-reference
If you’re not already using Pydantic for your pipeline configuration — separate concern, same principle — this article covers that pattern.