Skip to main content

Case 05: Automated Deployment Gate

Persona: MLOps Engineer / Forward Deployed Engineer

Situation

The team deploys models via a Slack checklist: "Did you check metrics? Did you compare with the previous version? Did you validate the data?" Three failures in the past quarter:

  • March: deployed with evaluation metrics from the wrong stage
  • April: deployed run-c with dataset v2025-03-31 (caused the CTR drop — Case 04)
  • May: evaluate stage was skipped entirely; no metrics in the run

Without Contexta

A Slack checklist relies on humans remembering to check the right things. The March failure passed because nobody verified which stage the metrics came from. The May failure passed because the checkbox said "metrics ✓" — nobody noticed the evaluate stage had been skipped. Manual processes have no memory of past failure modes.

With Contexta

def pre_deployment_gate(ctx, candidate_run_id, previous_deploy_run_id):
# Check 1: no error-level diagnostics
diag = ctx.diagnose_run(candidate_run_id)
errors = [i for i in diag.issues if i.severity == "error"]

# Check 2: all required metrics present
snap = ctx.get_run_snapshot(candidate_run_id)
obs_keys = {o.key for o in snap.records if o.record_type == "metric"}
missing = [m for m in REQUIRED_METRICS if m not in obs_keys]

# Check 3: no regression vs previous deployment
comp = ctx.compare_runs(previous_deploy_run_id, candidate_run_id)
...
ScenarioManual gateProgrammatic gate
Wrong-stage metricsPASS (human missed it)FAIL (metrics absent from evaluate stage)
Dataset version mismatchPASS (nobody checked)FAIL (DegradedRecord present)
Evaluate stage skippedPASS (checkbox ticked)FAIL (required metrics missing)

Key APIs: diagnose_run, get_run_snapshot, compare_runs


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case05_seed_deployment_gate_data.py
uv run examples/case_studies/case05_analyze_deployment_gate.py
case05_seed_deployment_gate_data.py
"""Create deployment-gate candidate records used by the gate case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DeploymentExecution,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
)

PROJECT_NAME = "product-ranker"
_rid = 0


def _next_rid() -> str:
global _rid
_rid += 1
return f"r{_rid:04d}"


def _make_run(
store: Any,
record_store: Any,
run_name: str,
accuracy: float,
auc: float,
f1: float,
has_evaluate_stage: bool = True,
started_at: str = "2025-05-01T09:00:00Z",
ended_at: str = "2025-05-01T11:00:00Z",
) -> str:
run_ref = f"run:{PROJECT_NAME}.{run_name}"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{PROJECT_NAME}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)
stage_ref = f"stage:{PROJECT_NAME}.{run_name}.evaluate"
if has_evaluate_stage:
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name="evaluate",
status="completed",
started_at=started_at,
ended_at=ended_at,
order_index=0,
)
)
for key, val in [("accuracy", accuracy), ("auc", auc), ("f1", f1)]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{PROJECT_NAME}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ended_at,
observed_at=ended_at,
producer_ref="contexta.case05",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=val,
value_type="float64",
),
)
)
return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
if workspace is None:
workspace = Path(tempfile.mkdtemp(prefix="contexta-case05-")) / ".contexta"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(workspace)),
)
)
store = ctx.metadata_store

try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-01-01T00:00:00Z",
)
)

# ── Scenario A: safe baseline (previous production model) ────────────
prev_run_ref = _make_run(
store, ctx.record_store, "baseline-v1",
accuracy=0.893, auc=0.927, f1=0.881,
started_at="2025-04-28T09:00:00Z",
ended_at="2025-04-28T11:00:00Z",
)
store.deployments.put_deployment_execution(
DeploymentExecution(
deployment_execution_ref=f"deployment:{PROJECT_NAME}.prod-v1",
project_ref=f"project:{PROJECT_NAME}",
deployment_name="prod-v1",
status="completed",
started_at="2025-04-28T12:00:00Z",
ended_at="2025-04-28T12:10:00Z",
run_ref=prev_run_ref,
order_index=0,
)
)

# ── Scenario B: GOOD candidate — should pass ────────────────────────
good_run_ref = _make_run(
store, ctx.record_store, "candidate-v2-good",
accuracy=0.901, auc=0.933, f1=0.889,
started_at="2025-05-05T09:00:00Z",
ended_at="2025-05-05T11:00:00Z",
)

# ── Scenario C: MISSING METRICS — should fail ───────────────────────
no_eval_run_ref = _make_run(
store, ctx.record_store, "candidate-v3-no-eval",
accuracy=0.0, auc=0.0, f1=0.0,
has_evaluate_stage=False,
started_at="2025-05-06T09:00:00Z",
ended_at="2025-05-06T11:00:00Z",
)

# ── Scenario D: REGRESSION — accuracy dropped 5% ────────────────────
regressed_run_ref = _make_run(
store, ctx.record_store, "candidate-v4-regressed",
accuracy=0.841, auc=0.891, f1=0.859,
started_at="2025-05-07T09:00:00Z",
ended_at="2025-05-07T11:00:00Z",
)

return {
"baseline_run_id": prev_run_ref,
"candidate_run_ids": [good_run_ref, no_eval_run_ref, regressed_run_ref],
}

finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case05_analyze_deployment_gate.py
"""Run a deployment gate against previously recorded candidates."""

from pathlib import Path
from dataclasses import dataclass

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "product-ranker"
REQUIRED_METRICS = ["accuracy", "auc", "f1"]
BASELINE = f"run:{PROJECT_NAME}.baseline-v1"
CANDIDATES = [
f"run:{PROJECT_NAME}.candidate-v2-good",
f"run:{PROJECT_NAME}.candidate-v3-no-eval",
f"run:{PROJECT_NAME}.candidate-v4-regressed",
]
REGRESSION_THRESHOLD = 0.02


@dataclass
class GateResult:
passed: bool
run_id: str
checks: list[tuple[str, bool, str]]

def print_report(self) -> None:
status = "PASS" if self.passed else "FAIL"
print(f"\n Pre-deployment gate for '{self.run_id}': [{status}]")
for name, ok, detail in self.checks:
icon = " [OK] " if ok else " [NG] "
print(f" {icon} [{name}] {detail}")


def pre_deployment_gate(
ctx: Contexta,
candidate_run_id: str,
previous_deploy_run_id: str | None,
regression_threshold: float,
) -> GateResult:
checks: list[tuple[str, bool, str]] = []

diag = ctx.diagnose_run(candidate_run_id)
errors = [issue for issue in diag.issues if issue.severity == "error"]
warnings = [issue for issue in diag.issues if issue.severity == "warning"]
if errors:
checks.append(
("diagnostics", False, f"{len(errors)} error(s): {', '.join(issue.code for issue in errors)}")
)
else:
checks.append(("diagnostics", True, f"clean ({len(warnings)} warning(s))"))

snapshot = ctx.get_run_snapshot(candidate_run_id)
metric_keys = {record.key for record in snapshot.records if record.record_type == "metric"}
missing = [metric for metric in REQUIRED_METRICS if metric not in metric_keys]
if missing:
checks.append(("required_metrics", False, f"missing: {missing}"))
else:
values = {
record.key: record.value
for record in snapshot.records
if record.record_type == "metric"
}
summary = ", ".join(f"{key}={values[key]:.4f}" for key in REQUIRED_METRICS)
checks.append(("required_metrics", True, summary))

comparison = ctx.compare_runs(previous_deploy_run_id, candidate_run_id)
regressions: list[str] = []
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if (
delta.metric_key in REQUIRED_METRICS
and delta.left_value is not None
and delta.right_value is not None
):
change = (delta.right_value - delta.left_value) / max(abs(delta.left_value), 1e-9)
if change < -regression_threshold:
regressions.append(
f"{delta.metric_key}: {delta.left_value:.4f}->{delta.right_value:.4f} ({change:+.1%})"
)

if regressions:
checks.append(("regression_check", False, f"regression detected: {'; '.join(regressions)}"))
else:
checks.append(
("regression_check", True, f"no significant regression vs previous deploy (threshold={regression_threshold:.0%})")
)

return GateResult(
passed=all(ok for _, ok, _ in checks),
run_id=candidate_run_id,
checks=checks,
)


ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
for candidate in CANDIDATES:
result = pre_deployment_gate(
ctx,
candidate,
BASELINE,
regression_threshold=REGRESSION_THRESHOLD,
)
result.print_report()
finally:
store.close()