Skip to main content

Case 04: Carlos's Deployment Traceability Problem

Persona: Carlos, ML Engineer

Situation

Carlos deploys a model on Friday afternoon. Monday morning, the product manager reports an 18% CTR drop overnight. Carlos's deployment notes say model_20250401.pkl. He cannot answer four questions:

  1. Which training run produced that checkpoint?
  2. What were the training metrics?
  3. Which dataset version was used?
  4. What would rolling back revert to?

Without Contexta

  • A filename is not a run reference. It does not link to metrics, dataset, or environment.
  • "Roll back" means swapping in a different .pkl file and hoping it was the previous one.
  • Answering the product manager takes 30 minutes of git log archaeology, Slack search, and notebook hunting.

With Contexta

# Step 1 — find all deployments and their linked runs
deployments = ctx.list_deployments(PROJECT_NAME)

# Step 2 — inspect the deployed run
snap = ctx.get_run_snapshot(run_c_ref)
# snap.run.name, snap.stages, snap.records (metrics + dataset event)

# Step 3 — traverse lineage from the deployment
lineage = ctx.traverse_lineage(friday_deploy_ref)

# Step 4 — compare deployed run vs safe baseline
comparison = ctx.compare_runs(run_c_ref, run_b_ref)

DeploymentExecution permanently links a deployment to the exact run that was deployed. Three API calls replace 30 minutes of archaeology.

Key APIs: DeploymentExecution, list_deployments, get_run_snapshot, traverse_lineage, compare_runs


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case04_seed_deployment_traceability_data.py
uv run examples/case_studies/case04_analyze_deployment_traceability.py
case04_seed_deployment_traceability_data.py
"""Create deployment-traceability records used by the traceability case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DeploymentExecution,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "ctr-ranking-model"

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _create_training_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
accuracy: float,
auc: float,
loss: float,
dataset_version: str,
started_at: str,
ended_at: str,
) -> str:
run_ref = f"run:{project_name}.{run_name}"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)

feat_stage_ref = f"stage:{project_name}.{run_name}.feature-engineering"
train_stage_ref = f"stage:{project_name}.{run_name}.train"

# feat stage ends halfway between started_at and ended_at (simple midpoint by string isn't safe — use fixed offsets)
from datetime import datetime, timedelta, timezone
_s = datetime.fromisoformat(started_at.replace("Z", "+00:00"))
_e = datetime.fromisoformat(ended_at.replace("Z", "+00:00"))
_mid = _s + (_e - _s) / 2
feat_ended = _mid.strftime("%Y-%m-%dT%H:%M:%SZ")
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=feat_stage_ref,
run_ref=run_ref,
stage_name="feature-engineering",
status="completed",
started_at=started_at,
ended_at=feat_ended,
order_index=0,
)
)
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=train_stage_ref,
run_ref=run_ref,
stage_name="train",
status="completed",
started_at=feat_ended,
ended_at=ended_at,
order_index=1,
)
)

obs_ts = ended_at
for key, val in [("accuracy", accuracy), ("auc", auc), ("loss", loss)]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case04",
run_ref=run_ref,
stage_execution_ref=train_stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=val,
value_type="float64",
aggregation_scope="run",
),
)
)

# Log dataset version as a structured event on the run (no stage context)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=started_at,
observed_at=started_at,
producer_ref="contexta.case04",
run_ref=run_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="training.dataset-registered",
level="info",
message=f"Training dataset version: {dataset_version}",
attributes={"dataset_version": dataset_version},
origin_marker="explicit_capture",
),
)
)

return run_ref


def _create_deployment(
store: Any,
project_name: str,
deploy_name: str,
run_ref: str,
started_at: str,
ended_at: str,
order_index: int,
) -> str:
deploy_ref = f"deployment:{project_name}.{deploy_name}"
store.deployments.put_deployment_execution(
DeploymentExecution(
deployment_execution_ref=deploy_ref,
project_ref=f"project:{project_name}",
deployment_name=deploy_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
order_index=order_index,
run_ref=run_ref,
)
)
return deploy_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create three training runs and two deployment records."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case04-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-03-01T00:00:00Z",
description="Click-through rate ranking model",
)
)

# Three training runs from Friday (a/b/c experiments before final deploy)
run_a_ref = _create_training_run(
store, ctx.record_store, PROJECT_NAME, "friday-run-a",
accuracy=0.881, auc=0.912, loss=0.308,
dataset_version="v2025-03-28",
started_at="2025-04-01T08:00:00Z",
ended_at="2025-04-01T09:30:00Z",
)
run_b_ref = _create_training_run(
store, ctx.record_store, PROJECT_NAME, "friday-run-b",
accuracy=0.893, auc=0.927, loss=0.281,
dataset_version="v2025-03-28",
started_at="2025-04-01T09:45:00Z",
ended_at="2025-04-01T11:15:00Z",
)
# run-c is what actually got deployed - best offline AUC
run_c_ref = _create_training_run(
store, ctx.record_store, PROJECT_NAME, "friday-run-c",
accuracy=0.901, auc=0.938, loss=0.261,
dataset_version="v2025-03-31", # newer dataset - might explain the CTR drop
started_at="2025-04-01T12:00:00Z",
ended_at="2025-04-01T13:45:00Z",
)

# Previous (safe) deployment linked to run-b
_create_deployment(
store, PROJECT_NAME, "prod-deploy-march",
run_b_ref,
started_at="2025-03-28T17:00:00Z",
ended_at="2025-03-28T17:10:00Z",
order_index=0,
)

# Friday deployment linked to run-c (the one with CTR drop)
friday_deploy_ref = _create_deployment(
store, PROJECT_NAME, "prod-deploy-april",
run_c_ref,
started_at="2025-04-01T16:00:00Z",
ended_at="2025-04-01T16:08:00Z",
order_index=1,
)

return {
"run_ids": [run_a_ref, run_b_ref, run_c_ref],
"deployment_ids": [
f"deployment:{PROJECT_NAME}.prod-deploy-march",
friday_deploy_ref,
],
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case04_analyze_deployment_traceability.py
"""Trace a previously recorded deployment back to its training run."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "ctr-ranking-model"
DEPLOYED_RUN = f"run:{PROJECT_NAME}.friday-run-c"
SAFE_RUN = f"run:{PROJECT_NAME}.friday-run-b"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
print("Deployments:")
for deployment in ctx.list_deployments(PROJECT_NAME):
print(f"{deployment.deployment_id} -> {deployment.run_id}")

snapshot = ctx.get_run_snapshot(DEPLOYED_RUN)
print(f"\nCurrently deployed run: {snapshot.run.name}")
for record in snapshot.records:
if record.record_type == "metric":
print(f"{record.key}: {record.value:.4f}")
if record.record_type == "event" and record.key == "training.dataset-registered":
print(record.message)

print("\nComparison: safe baseline -> deployed")
comparison = ctx.compare_runs(SAFE_RUN, DEPLOYED_RUN)
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
finally:
store.close()