Skip to main content

Case 03: Nina's Silent Pipeline Failure

Persona: Nina, MLE / Data Engineer

Situation

A 5-stage nightly pipeline (ingest → validate → featurize → train → evaluate) runs for 3 days with status=completed on every stage. On Day 1, a schema migration caused the validate stage to pass an empty DataFrame downstream instead of raising an exception. The train stage trained on zero rows and returned default weights. The evaluate stage reported terrible metrics — but no alert fired. By Day 3, three days of bad checkpoints were queued for production.

Without Contexta

  • Exit code 0 + status=completed is the only signal available to the orchestrator.
  • The orchestrator cannot distinguish "completed correctly" from "completed on empty input".
  • Dashboard alerts require threshold configuration that nobody got around to.
  • Debugging three days later means reconstructing what happened from logs that may have rotated.

With Contexta

# validate stage emits a DegradedRecord when it detects empty output
DegradedRecord(
envelope=RecordEnvelope(..., degradation_marker="partial_failure"),
payload=DegradedPayload(
category="verification", severity="error",
summary="output-dataframe-empty-after-join",
)
)

# Day 1 — immediately visible
diag = ctx.diagnose_run(run_ref)
errors = [i for i in diag.issues if i.severity == "error"]
# → 1 error: degraded_record in validate stage

DegradedRecord is a first-class record type that survives even when the stage exits 0. diagnose_run surfaces it on Day 1 before any checkpoint is promoted.

Key APIs: DegradedRecord, diagnose_run


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case03_seed_pipeline_failure_data.py
uv run examples/case_studies/case03_analyze_pipeline_failure.py
case03_seed_pipeline_failure_data.py
"""Create silent-pipeline-failure records used by the pipeline case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "nightly-ml-pipeline"

PIPELINE_STAGES = ["ingest", "preprocess", "validate", "train", "evaluate"]

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _create_stage(
store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_name: str,
status: str,
started_at: str,
ended_at: str,
order_index: int,
) -> str:
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status=status,
started_at=started_at,
ended_at=ended_at,
order_index=order_index,
)
)
return stage_ref


def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
aggregation_scope="stage",
),
)
)


def _emit_degraded(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
ts: str,
row_count: int,
) -> None:
"""Emit a DegradedRecord representing empty-dataframe pass-through."""
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="partial",
degradation_marker="capture_gap", # data never arrived
),
payload=DegradedPayload(
issue_key="validate.empty_dataframe_passthrough",
category="capture",
severity="error",
summary=(
f"Validate stage received {row_count} rows after schema migration. "
"Empty DataFrame passed to train stage without raising an exception."
),
origin_marker="explicit_capture",
attributes={
"row_count": row_count,
"expected_minimum_rows": 10000,
"stage": "validate",
},
),
)
)


def _emit_event(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
event_key: str,
message: str,
level: str,
ts: str,
) -> None:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key=event_key,
level=level,
message=message,
origin_marker="explicit_capture",
),
)
)


def _build_daily_run(
store: Any,
record_store: Any,
project_name: str,
day: int,
silent_failure: bool,
) -> str:
"""Create one nightly run. If silent_failure=True, validate passes empty data."""
run_name = f"nightly-day{day}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-03-{17 + day:02d}"
base = f"{date_str}T02:00:00Z"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=base,
ended_at=f"{date_str}T04:30:00Z",
)
)

# Build all 5 stages. Validate "succeeds" superficially even on bad days.
stage_times = [
("ingest", f"{date_str}T02:00:00Z", f"{date_str}T02:20:00Z"),
("preprocess", f"{date_str}T02:20:00Z", f"{date_str}T02:50:00Z"),
("validate", f"{date_str}T02:50:00Z", f"{date_str}T03:00:00Z"),
("train", f"{date_str}T03:00:00Z", f"{date_str}T04:00:00Z"),
("evaluate", f"{date_str}T04:00:00Z", f"{date_str}T04:30:00Z"),
]

stage_refs: dict[str, str] = {}
for idx, (sname, sstart, send) in enumerate(stage_times):
ref = _create_stage(store, project_name, run_name, run_ref, sname, "completed", sstart, send, idx)
stage_refs[sname] = ref

# Ingest: records loaded
ingest_rows = 120_000 if not silent_failure else 120_000 # ingest itself is fine
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["ingest"],
"pipeline.ingest-complete", f"Loaded {ingest_rows} rows from warehouse.",
"info", f"{date_str}T02:19:00Z")

# Preprocess: OK
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["preprocess"],
"pipeline.preprocess-complete", "Feature engineering done. 118,432 valid rows.",
"info", f"{date_str}T02:49:00Z")

# Validate: the silent failure lives here
validate_ts = f"{date_str}T02:59:00Z"
if silent_failure:
# Schema migration broke the join - validate passes 0 rows downstream
_emit_degraded(
record_store, project_name, run_name, run_ref,
stage_refs["validate"], validate_ts, row_count=0,
)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation check passed (0 rows - schema join returned empty).",
"warning", validate_ts)
else:
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation passed. 117,800 rows passed quality checks.",
"info", validate_ts)

# Train: runs but with garbage data on bad days
train_ts = f"{date_str}T03:59:00Z"
train_loss = 2.31 if silent_failure else 0.28 # default weights = random = high loss
train_acc = 0.11 if silent_failure else 0.934
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.loss", train_loss, train_ts)
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.accuracy", train_acc, train_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["train"],
"training.epoch-end", f"Training complete. loss={train_loss:.3f}",
"info", train_ts)

# Evaluate
eval_ts = f"{date_str}T04:29:00Z"
eval_acc = 0.12 if silent_failure else 0.928
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["evaluate"], "eval.accuracy", eval_acc, eval_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["evaluate"],
"evaluation.complete", f"Eval accuracy={eval_acc:.3f}",
"info" if not silent_failure else "warning", eval_ts)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 3 days of silent validate failure."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case03-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-01-01T00:00:00Z",
description="Nightly ML training pipeline (5 stages)",
)
)

day_run_refs = []
for day in range(1, 4): # Days 1, 2, 3 - all silent failures
ref = _build_daily_run(store, ctx.record_store, PROJECT_NAME, day, silent_failure=True)
day_run_refs.append((day, ref))

return {
"days_simulated": len(day_run_refs),
"run_ids": [run_ref for _, run_ref in day_run_refs],
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case03_analyze_pipeline_failure.py
"""Diagnose previously recorded silent pipeline failures."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "nightly-ml-pipeline"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
runs = ctx.list_runs(PROJECT_NAME)
for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")
finally:
store.close()