Skip to main content

Case 08: Upstream Data Contamination Window

Persona: MLE + Data Engineer pair

Situation

A vendor changed their API response schema 3 weeks ago. The change silently clamped purchase_intent_score from [0.0, 1.0] to [0.0, 0.1] — the field name stayed the same, so schema validation did not catch it. Four training runs happened during the contamination window (April 1–21). Each run "looked fine" in isolation — metrics were slightly lower but the team attributed that to natural variance. Today a data engineer noticed the clamping while debugging a separate issue.

Without Contexta

  • No record of which runs overlapped with which data quality events.
  • Identifying affected runs requires cross-referencing training dates against the contamination window — manually, across multiple systems.
  • The "best" run during the window was promoted to production. There is no programmatic way to quarantine it.

With Contexta

# Contaminated runs tagged at training time
StructuredEventRecord(payload=StructuredEventPayload(
event_key="data.contamination-window",
message="Training during vendor API contamination window Apr 1-21",
))

# Triage: identify, compare, select
contaminated_refs = [r for r in run_refs if was_in_window(r)]
comparison = ctx.compare_runs(clean_baseline_ref, latest_contaminated_ref)

# Which contaminated run "looked best" but shouldn't be trusted?
false_best = ctx.select_best_run(contaminated_refs, "auc", higher_is_better=True)

Event records tag the contamination window at training time. compare_runs shows how much the contaminated runs diverged from the clean baseline. select_best_run identifies the run that would have been promoted — and confirms it should be quarantined.

Key APIs: StructuredEventRecord, compare_runs, select_best_run


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case08_seed_contamination_data.py
uv run examples/case_studies/case08_analyze_contamination.py
case08_seed_contamination_data.py
"""Create upstream-contamination records used by the contamination case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "purchase-intent-model"

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _build_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
accuracy: float,
auc: float,
f1: float,
contaminated: bool,
) -> str:
"""Register a training run with evaluation metrics and optional contamination event."""
run_ref = f"run:{project_name}.{run_name}"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)

train_ref = f"stage:{project_name}.{run_name}.train"
eval_ref = f"stage:{project_name}.{run_name}.evaluate"

store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=train_ref,
run_ref=run_ref,
stage_name="train",
status="completed",
started_at=started_at,
ended_at=f"{started_at[:10]}T10:00:00Z",
order_index=0,
)
)
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=eval_ref,
run_ref=run_ref,
stage_name="evaluate",
status="completed",
started_at=f"{started_at[:10]}T10:00:00Z",
ended_at=ended_at,
order_index=1,
)
)

obs_ts = ended_at
for key, val in [("accuracy", accuracy), ("auc", auc), ("f1", f1)]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case08",
run_ref=run_ref,
stage_execution_ref=eval_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=val,
value_type="float64",
),
)
)

if contaminated:
# Tag the run as being within the contamination window
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=started_at,
observed_at=started_at,
producer_ref="contexta.case08",
run_ref=run_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="data.contamination-window",
level="warning",
message=(
"Run trained during vendor schema contamination window "
"(2025-04-01 to 2025-04-21). Feature purchase_intent_score "
"was clamped to [0.0, 0.1] instead of [0.0, 1.0]."
),
attributes={
"contamination_start": "2025-04-01",
"contamination_end": "2025-04-21",
"affected_feature": "purchase_intent_score",
"vendor_issue": "schema_range_clamp",
},
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create 5 runs: 1 clean baseline and 4 contaminated runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case08-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-03-01T00:00:00Z",
description="Purchase intent prediction model",
)
)

# 1 clean baseline before contamination
baseline_ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name="run-2025-03-28",
started_at="2025-03-28T09:00:00Z",
ended_at="2025-03-28T12:00:00Z",
accuracy=0.934, auc=0.961, f1=0.928,
contaminated=False,
)

# 4 contaminated runs during the window
contaminated_configs = [
("run-2025-04-03", "2025-04-03T09:00:00Z", "2025-04-03T12:00:00Z", 0.891, 0.912, 0.887),
("run-2025-04-08", "2025-04-08T09:00:00Z", "2025-04-08T12:00:00Z", 0.884, 0.905, 0.879),
("run-2025-04-14", "2025-04-14T09:00:00Z", "2025-04-14T12:00:00Z", 0.897, 0.918, 0.893),
("run-2025-04-21", "2025-04-21T09:00:00Z", "2025-04-21T12:00:00Z", 0.878, 0.901, 0.872),
]
contaminated_refs: list[str] = []
for run_name, started, ended, acc, auc, f1 in contaminated_configs:
ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name=run_name,
started_at=started,
ended_at=ended,
accuracy=acc, auc=auc, f1=f1,
contaminated=True,
)
contaminated_refs.append(ref)

return {
"baseline_run_id": baseline_ref,
"contaminated_run_count": len(contaminated_refs),
"contaminated_run_ids": contaminated_refs,
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case08_analyze_contamination.py
"""Identify contaminated runs from previously recorded training history."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "purchase-intent-model"
BASELINE = f"run:{PROJECT_NAME}.run-2025-03-28"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
contaminated: list[str] = []
print("Runs trained during contamination window:")
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
event_keys = {record.key for record in snapshot.records if record.record_type == "event"}
if "data.contamination-window" in event_keys:
contaminated.append(run.run_id)
print(f" {run.name}")

latest = contaminated[-1]
print("\nClean baseline -> latest contaminated:")
comparison = ctx.compare_runs(BASELINE, latest)
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")

false_best = ctx.select_best_run(contaminated, "auc", stage_name="evaluate", higher_is_better=True)
print(f"\nFalse best to quarantine: {false_best}")
finally:
store.close()