Case 03: Nina's Silent Pipeline Failure
Persona: Nina, MLE / Data Engineer
Situation
A 5-stage nightly pipeline (ingest → validate → featurize → train → evaluate) runs for 3
days with status=completed on every stage. On Day 1, a schema migration caused the
validate stage to pass an empty DataFrame downstream instead of raising an exception.
The train stage trained on zero rows and returned default weights. The evaluate stage
reported terrible metrics — but no alert fired. By Day 3, three days of bad checkpoints
were queued for production.
Without Contexta
- Exit code 0 +
status=completedis the only signal available to the orchestrator. - The orchestrator cannot distinguish "completed correctly" from "completed on empty input".
- Dashboard alerts require threshold configuration that nobody got around to.
- Debugging three days later means reconstructing what happened from logs that may have rotated.
With Contexta
# validate stage emits a DegradedRecord when it detects empty output
DegradedRecord(
envelope=RecordEnvelope(..., degradation_marker="partial_failure"),
payload=DegradedPayload(
category="verification", severity="error",
summary="output-dataframe-empty-after-join",
)
)
# Day 1 — immediately visible
diag = ctx.diagnose_run(run_ref)
errors = [i for i in diag.issues if i.severity == "error"]
# → 1 error: degraded_record in validate stage
DegradedRecord is a first-class record type that survives even when the stage exits 0.
diagnose_run surfaces it on Day 1 before any checkpoint is promoted.
Key APIs: DegradedRecord, diagnose_run
Complete Runnable Code
Run the seed script first, then the analysis script:
uv run examples/case_studies/case03_seed_pipeline_failure_data.py
uv run examples/case_studies/case03_analyze_pipeline_failure.py
case03_seed_pipeline_failure_data.py
"""Create silent-pipeline-failure records used by the pipeline case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "nightly-ml-pipeline"
PIPELINE_STAGES = ["ingest", "preprocess", "validate", "train", "evaluate"]
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _create_stage(
store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_name: str,
status: str,
started_at: str,
ended_at: str,
order_index: int,
) -> str:
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status=status,
started_at=started_at,
ended_at=ended_at,
order_index=order_index,
)
)
return stage_ref
def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
aggregation_scope="stage",
),
)
)
def _emit_degraded(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
ts: str,
row_count: int,
) -> None:
"""Emit a DegradedRecord representing empty-dataframe pass-through."""
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="partial",
degradation_marker="capture_gap", # data never arrived
),
payload=DegradedPayload(
issue_key="validate.empty_dataframe_passthrough",
category="capture",
severity="error",
summary=(
f"Validate stage received {row_count} rows after schema migration. "
"Empty DataFrame passed to train stage without raising an exception."
),
origin_marker="explicit_capture",
attributes={
"row_count": row_count,
"expected_minimum_rows": 10000,
"stage": "validate",
},
),
)
)
def _emit_event(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
event_key: str,
message: str,
level: str,
ts: str,
) -> None:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key=event_key,
level=level,
message=message,
origin_marker="explicit_capture",
),
)
)
def _build_daily_run(
store: Any,
record_store: Any,
project_name: str,
day: int,
silent_failure: bool,
) -> str:
"""Create one nightly run. If silent_failure=True, validate passes empty data."""
run_name = f"nightly-day{day}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-03-{17 + day:02d}"
base = f"{date_str}T02:00:00Z"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=base,
ended_at=f"{date_str}T04:30:00Z",
)
)
# Build all 5 stages. Validate "succeeds" superficially even on bad days.
stage_times = [
("ingest", f"{date_str}T02:00:00Z", f"{date_str}T02:20:00Z"),
("preprocess", f"{date_str}T02:20:00Z", f"{date_str}T02:50:00Z"),
("validate", f"{date_str}T02:50:00Z", f"{date_str}T03:00:00Z"),
("train", f"{date_str}T03:00:00Z", f"{date_str}T04:00:00Z"),
("evaluate", f"{date_str}T04:00:00Z", f"{date_str}T04:30:00Z"),
]
stage_refs: dict[str, str] = {}
for idx, (sname, sstart, send) in enumerate(stage_times):
ref = _create_stage(store, project_name, run_name, run_ref, sname, "completed", sstart, send, idx)
stage_refs[sname] = ref
# Ingest: records loaded
ingest_rows = 120_000 if not silent_failure else 120_000 # ingest itself is fine
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["ingest"],
"pipeline.ingest-complete", f"Loaded {ingest_rows} rows from warehouse.",
"info", f"{date_str}T02:19:00Z")
# Preprocess: OK
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["preprocess"],
"pipeline.preprocess-complete", "Feature engineering done. 118,432 valid rows.",
"info", f"{date_str}T02:49:00Z")
# Validate: the silent failure lives here
validate_ts = f"{date_str}T02:59:00Z"
if silent_failure:
# Schema migration broke the join - validate passes 0 rows downstream
_emit_degraded(
record_store, project_name, run_name, run_ref,
stage_refs["validate"], validate_ts, row_count=0,
)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation check passed (0 rows - schema join returned empty).",
"warning", validate_ts)
else:
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation passed. 117,800 rows passed quality checks.",
"info", validate_ts)
# Train: runs but with garbage data on bad days
train_ts = f"{date_str}T03:59:00Z"
train_loss = 2.31 if silent_failure else 0.28 # default weights = random = high loss
train_acc = 0.11 if silent_failure else 0.934
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.loss", train_loss, train_ts)
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.accuracy", train_acc, train_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["train"],
"training.epoch-end", f"Training complete. loss={train_loss:.3f}",
"info", train_ts)
# Evaluate
eval_ts = f"{date_str}T04:29:00Z"
eval_acc = 0.12 if silent_failure else 0.928
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["evaluate"], "eval.accuracy", eval_acc, eval_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["evaluate"],
"evaluation.complete", f"Eval accuracy={eval_acc:.3f}",
"info" if not silent_failure else "warning", eval_ts)
return run_ref
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 3 days of silent validate failure."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case03-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-01-01T00:00:00Z",
description="Nightly ML training pipeline (5 stages)",
)
)
day_run_refs = []
for day in range(1, 4): # Days 1, 2, 3 - all silent failures
ref = _build_daily_run(store, ctx.record_store, PROJECT_NAME, day, silent_failure=True)
day_run_refs.append((day, ref))
return {
"days_simulated": len(day_run_refs),
"run_ids": [run_ref for _, run_ref in day_run_refs],
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
case03_analyze_pipeline_failure.py
"""Diagnose previously recorded silent pipeline failures."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "nightly-ml-pipeline"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
runs = ctx.list_runs(PROJECT_NAME)
for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")
finally:
store.close()