Skip to main content

Case 07: David's Silent Batch Job Failure

Persona: David, Data Engineer

Situation

A nightly ETL job feature-engineers data for the ML model. On Night 5 of 7, the job completed with exit code 0, but silently truncated a feature column because an upstream vendor changed their API response schema. The vendor's new schema dropped a field that a join relied on — the join succeeded but returned NULLs, and a numeric feature column was quietly zeroed out. The downstream model retrained, accuracy dropped from 0.934 to 0.871, and no alert fired because the batch job "succeeded."

Without Contexta

  • Exit code 0 is the scheduler's only health signal.
  • Batch job status completed does not distinguish "correct output" from "zero-filled column."
  • Two days of debugging without knowing which night caused the accuracy drop.
  • Logs may have rotated. Reproducing the exact failure requires re-running the pipeline.

With Contexta

# Night 5 feature-engineering stage emits a DegradedRecord
DegradedRecord(
payload=DegradedPayload(
category="verification", severity="warning",
summary="null-rate-exceeded-threshold",
details={"column": "purchase_intent_score", "null_rate": 0.98},
)
)

# Immediately visible
diag = ctx.diagnose_run(night5_run_ref)
issues = [i for i in diag.issues if i.code == "degraded_record"]
# → 1 warning: null-rate-exceeded-threshold in feature-engineering stage

BatchExecution records link each night's job to its run, stage, and records. A weekly summary table shows which night had the degradation — without re-running anything.

Key APIs: BatchExecution, DegradedRecord, diagnose_run


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case07_seed_batch_monitoring_data.py
uv run examples/case_studies/case07_analyze_batch_monitoring.py
case07_seed_batch_monitoring_data.py
"""Create batch-monitoring records used by the batch case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
BatchExecution,
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "nightly-etl-feature-engineering"
STAGE_NAME = "feature-engineering"

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _build_nightly_run(
store: Any,
record_store: Any,
project_name: str,
night: int,
has_schema_issue: bool,
) -> str:
"""Create one nightly ETL run with a single feature-engineering stage and batch."""
run_name = f"etl-night-{night:02d}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-04-{night:02d}"
run_started = f"{date_str}T01:00:00Z"
run_ended = f"{date_str}T01:45:00Z"
stage_started = run_started
stage_ended = f"{date_str}T01:30:00Z"
batch_started = f"{date_str}T01:05:00Z"
batch_ended = f"{date_str}T01:25:00Z"
obs_ts = stage_ended

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=run_started,
ended_at=run_ended,
)
)

stage_ref = f"stage:{project_name}.{run_name}.{STAGE_NAME}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=STAGE_NAME,
status="completed",
started_at=stage_started,
ended_at=stage_ended,
order_index=0,
)
)

batch_name = f"batch-night-{night:02d}"
batch_ref = f"batch:{project_name}.{run_name}.{STAGE_NAME}.{batch_name}"
store.batches.put_batch_execution(
BatchExecution(
batch_execution_ref=batch_ref,
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_name=batch_name,
status="completed",
started_at=batch_started,
ended_at=batch_ended,
order_index=0,
)
)

# Normal metrics
rows_processed = 0 if has_schema_issue else 118_000 + night * 200
null_rate = 1.0 if has_schema_issue else 0.002
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key="rows-processed",
value=float(rows_processed),
value_type="float64",
),
)
)
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key="feature-null-rate",
value=null_rate,
value_type="float64",
),
)
)

if has_schema_issue:
# Vendor changed schema: join returned NULLs for the purchase_amount column.
# Exit code still 0. Emit a DegradedRecord to capture the verification gap.
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="etl.feature_column_null_overflow",
category="verification",
severity="warning",
summary=(
"Feature column purchase_amount was fully zeroed after "
"vendor schema change dropped the source field. "
"Join succeeded but all values are NULL. Exit code was 0."
),
origin_marker="explicit_capture",
attributes={
"column": "purchase_amount",
"null_rate": 1.0,
"vendor_schema_version": "v2",
"expected_schema_version": "v1",
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="etl.batch-completed",
level="warning",
message=(
f"Batch {batch_name} completed. "
"purchase_amount column zeroed (vendor schema mismatch). "
"Exit code: 0."
),
origin_marker="explicit_capture",
),
)
)
else:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="etl.batch-completed",
level="info",
message=f"Batch {batch_name} completed. {rows_processed} rows processed.",
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 7 nightly ETL runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case07-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Nightly ETL feature-engineering pipeline",
)
)

# Night 5 (April 5) has the schema issue
PROBLEM_NIGHT = 5
night_run_refs: list[tuple[int, str]] = []
for night in range(1, 8):
has_issue = (night == PROBLEM_NIGHT)
ref = _build_nightly_run(
store, ctx.record_store, PROJECT_NAME, night, has_schema_issue=has_issue
)
night_run_refs.append((night, ref))

return {
"nights_simulated": 7,
"problem_night": PROBLEM_NIGHT,
"run_ids": [run_ref for _, run_ref in night_run_refs],
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case07_analyze_batch_monitoring.py
"""Find degraded nightly batch jobs from previously recorded runs."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "nightly-etl-feature-engineering"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
print("Nightly batch summary:")
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
metrics = {record.key: record.value for record in snapshot.records if record.record_type == "metric"}
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
flag = "DEGRADED" if degraded else "ok"
print(
f"{snapshot.run.name}: rows={int(metrics.get('rows-processed', 0))}, "
f"null_rate={metrics.get('feature-null-rate', 0):.4f}, {flag}"
)
finally:
store.close()