Case 01: Sara's Scattered HPO Experiments
Persona: Sara, ML Engineer
Situation
Sara runs 8 hyperparameter search experiments over a weekend. Each produces a CSV or JSON
file with a name like lr0001_bs32_aug_20250318_v3_FINAL.csv. On Monday, her tech lead
asks: "Which experiment was best?" Sara cannot answer without 20 minutes of
spreadsheet archaeology — opening each file, extracting the accuracy number, and manually
ranking them.
Without Contexta
lr0001_bs32_aug_20250318_v3_FINAL.csv
results_bs64_lr001_with_augmentation.xlsx
experiment_march18_attempt2.txt
BEST_run_maybe_lr0005_bs64.csv
...
- Results live in files named by the engineer, not the framework.
- No common schema → comparing across files requires custom parsing.
- No ranking API → the "best" run is wherever the engineer left a sticky note.
- Two weeks later, the CSV files may be gone from the laptop.
With Contexta
# All 8 runs indexed at creation time.
best_ref = ctx.select_best_run(run_refs, "accuracy", stage_name="train")
report = ctx.build_multi_run_report(run_refs)
Every run is registered with its metrics at the moment it completes. select_best_run
answers the sprint-review question in one call. build_multi_run_report produces a
structured comparison with no manual work.
Key APIs: select_best_run, build_multi_run_report, compare_runs
Complete Runnable Code
Run the seed script first, then the analysis script:
uv run examples/case_studies/case01_seed_hpo_data.py
uv run examples/case_studies/case01_analyze_hpo_runs.py
case01_seed_hpo_data.py
"""Create canonical HPO run records used by the scattered-experiments case study."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import MetricPayload, MetricRecord, Project, RecordEnvelope, Run, StageExecution
PROJECT_NAME = "image-classifier-hpo"
EXPERIMENTS = (
("exp-lr1e-3-bs32-aug", 0.874, 0.341, "completed", 45),
("exp-lr1e-3-bs64-aug", 0.891, 0.298, "completed", 42),
("exp-lr1e-3-bs128-noaug", 0.863, 0.372, "completed", 38),
("exp-lr5e-4-bs32-aug", 0.901, 0.267, "completed", 51),
("exp-lr5e-4-bs64-aug", 0.918, 0.231, "completed", 49),
("exp-lr5e-4-bs128-aug", 0.897, 0.281, "completed", 44),
("exp-lr1e-4-bs32-aug", 0.812, 0.489, "failed", 12),
("exp-lr2e-3-bs64-noaug", 0.841, 0.421, "failed", 8),
)
def metric_record(run_name: str, key: str, value: float, observed_at: str) -> MetricRecord:
run_ref = f"run:{PROJECT_NAME}.{run_name}"
stage_ref = f"stage:{PROJECT_NAME}.{run_name}.train"
return MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{PROJECT_NAME}.{run_name}.{key}",
record_type="metric",
recorded_at=observed_at,
observed_at=observed_at,
producer_ref="docs.case01.seed",
run_ref=run_ref,
stage_execution_ref=stage_ref,
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float",
unit="ratio" if key == "accuracy" else None,
aggregation_scope="stage",
subject_ref=stage_ref,
summary_basis="raw_observation",
),
)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
metadata = ctx.metadata_store
try:
metadata.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-03-18T08:55:00Z",
description="Image classifier hyperparameter search",
)
)
for name, accuracy, loss, status, minute in EXPERIMENTS:
run_ref = f"run:{PROJECT_NAME}.{name}"
stage_ref = f"stage:{PROJECT_NAME}.{name}.train"
ended_at = f"2025-03-18T09:{minute:02d}:00Z"
metadata.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{PROJECT_NAME}",
name=name,
status=status,
started_at="2025-03-18T09:00:00Z",
ended_at=ended_at,
)
)
metadata.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name="train",
status=status,
started_at="2025-03-18T09:00:00Z",
ended_at=ended_at,
order_index=0,
)
)
if status == "completed":
ctx.record_store.append(metric_record(name, "accuracy", accuracy, ended_at))
ctx.record_store.append(metric_record(name, "loss", loss, ended_at))
print(f"Seeded {len(EXPERIMENTS)} runs in .contexta for project {PROJECT_NAME}.")
finally:
metadata.close()
case01_analyze_hpo_runs.py
"""Investigate previously recorded HPO runs and select the best candidate."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "image-classifier-hpo"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
runs = ctx.list_runs(PROJECT_NAME)
completed_run_ids = [run.run_id for run in runs if run.status == "completed"]
failed_run_ids = [run.run_id for run in runs if run.status == "failed"]
ranked_runs: list[tuple[str, str, float, float]] = []
for run_id in completed_run_ids:
snapshot = ctx.get_run_snapshot(run_id)
metrics = {
record.key: float(record.value)
for record in snapshot.records
if record.record_type == "metric" and record.value is not None
}
ranked_runs.append(
(
run_id,
snapshot.run.name,
metrics["accuracy"],
metrics["loss"],
)
)
ranked_runs.sort(key=lambda row: row[2], reverse=True)
print(
f"Runs: {len(runs)} total, "
f"{len(completed_run_ids)} completed, {len(failed_run_ids)} failed"
)
print("Rank Run name Accuracy Loss")
for rank, (_, name, accuracy, loss) in enumerate(ranked_runs, start=1):
print(f"#{rank:<4} {name:<30} {accuracy:.4f} {loss:.4f}")
best_run_id = ctx.select_best_run(
completed_run_ids,
"accuracy",
stage_name="train",
higher_is_better=True,
)
best_row = next(row for row in ranked_runs if row[0] == best_run_id)
print(f"\nSelected run: {best_row[1]}")
print(f"Selected accuracy: {best_row[2]:.4f}; loss: {best_row[3]:.4f}")
runner_up_id = ranked_runs[1][0]
comparison = ctx.compare_runs(runner_up_id, best_run_id)
print(f"\nComparison: {ranked_runs[1][1]} -> {best_row[1]}")
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
report = ctx.build_multi_run_report(completed_run_ids)
print(f"\nReport: {report.title}")
print("Sections: " + ", ".join(section.title for section in report.sections))
finally:
store.close()