Common Contexta Workflows
This guide focuses on the day-to-day tasks most users care about once a workspace already contains canonical data.
The safest default remains:
- start with
Contexta - bind it to one workspace
- use facade methods first
- move to direct store or recovery APIs only when you need more control
If you have not created a working workspace yet, finish Getting Started first.
Open One Workspace
Most workflows begin by opening one workspace through one facade:
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
ctx = Contexta(
config=UnifiedConfig(
project_name="guide-proj",
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
Use one workspace per logical project or experiment family. That keeps run refs, reports, and recovery actions easier to reason about.
Inspect One Run
If you already know a canonical run ref, the fastest read path is a run snapshot:
snapshot = ctx.get_run_snapshot("run:guide-proj.demo-run")
print(snapshot.run.run_id)
print(snapshot.run.status)
print(len(snapshot.stages))
print(len(snapshot.records))
print(len(snapshot.artifacts))
Use this workflow when you want to answer:
- what happened in this run?
- which stages were present?
- how much evidence exists in the workspace already?
Compare Two Runs
Run comparison is the next most common workflow once you have more than one execution to inspect:
comparison = ctx.compare_runs(
"run:guide-proj.demo-run",
"run:guide-proj.demo-run-v2",
)
print(comparison.summary)
print(len(comparison.stage_comparisons))
If you are comparing multiple candidate runs and want one best run by a metric:
best = ctx.select_best_run(
[
"run:guide-proj.demo-run",
"run:guide-proj.demo-run-v2",
],
metric_key="accuracy",
higher_is_better=True,
)
print(best)
Use compare when you want to inspect:
- metric changes
- stage-level differences
- report-level differences
- best-run selection for one metric
Build Reports
Once the data is in canonical form, report generation stays under the same facade:
snapshot_report = ctx.build_snapshot_report("run:guide-proj.demo-run")
compare_report = ctx.build_run_report(
"run:guide-proj.demo-run",
"run:guide-proj.demo-run-v2",
)
project_report = ctx.build_project_summary_report("guide-proj")
Reports can then be materialized into formats that fit the downstream task:
markdown_text = snapshot_report.to_markdown()
html_text = snapshot_report.to_html()
json_payload = snapshot_report.to_json()
Use report generation when you want output that is easier to:
- review
- share
- archive
- render into HTML or export workflows later
Inspect Diagnostics
Diagnostics are useful when you want the system to point at incomplete or suspicious states:
diagnostics = ctx.diagnose_run("run:guide-proj.demo-run")
for issue in diagnostics.issues:
print(issue.severity, issue.code, issue.summary)
Use diagnostics when you want a quicker answer to:
- what looks incomplete?
- what looks inconsistent?
- which issues should I inspect first?
Trace Lineage
Lineage helps when the question is about relationships rather than one run in isolation:
traversal = ctx.traverse_lineage(
"artifact:guide-proj.demo-run.model",
direction="outbound",
max_depth=3,
)
print(len(traversal.edges))
print(len(traversal.visited_refs))
Use lineage when you want to ask:
- where did this artifact come from?
- what depends on this result?
- what sits upstream or downstream of this subject?
Analyze Metric Trends
If the question is about run-to-run movement instead of a single comparison, use a trend query:
trend = ctx.get_metric_trend(
"accuracy",
project_name="guide-proj",
)
print(trend.metric_key)
print(len(trend.points))
Trend workflows are useful for:
- metric drift across runs
- project-level progress over time
- identifying values worth deeper comparison
Runtime Capture From Actual Work
The runtime capture surface is most meaningful when the signal is produced by the operation under observation:
- Machine Learning
- Deep Learning
- LLM
"""Train a real regression model and capture its measured evidence."""
import pickle
from pathlib import Path
from sklearn.datasets import load_diabetes
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from contexta import Contexta
from contexta.capture import LocalJsonlSink
features, targets = load_diabetes(return_X_y=True)
train_x, test_x, train_y, test_y = train_test_split(
features, targets, test_size=0.2, random_state=42
)
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "diabetes-regression"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
model = LinearRegression()
with ctx.run("linear-regression", dataset_ref="dataset:sklearn.diabetes") as run:
run.event(
"dataset.loaded",
message="Loaded the scikit-learn diabetes dataset",
attributes={"rows": len(features), "features": features.shape[1]},
)
with run.stage("train"):
model.fit(train_x, train_y)
with run.stage("evaluate") as stage:
predictions = model.predict(test_x)
r2 = r2_score(test_y, predictions)
mae = mean_absolute_error(test_y, predictions)
stage.metric("r2", r2, unit="ratio")
stage.metric("mae", mae)
model_path = workspace / "models" / "linear-regression.pkl"
model_path.parent.mkdir(parents=True, exist_ok=True)
model_path.write_bytes(pickle.dumps(model))
run.register_artifact("model", str(model_path), attributes={"format": "pickle"})
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
print(f"Captured run: {run.ref}")
print(f"Measured r2: {r2:.3f}; mae: {mae:.3f}")
print(f"Records: {records_path.as_posix()}")
print(f"Model artifact: {model_path.as_posix()}")
"""Train a tiny CNN and capture epoch, evaluation, and checkpoint evidence."""
from pathlib import Path
import torch
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from contexta import Contexta
from contexta.capture import LocalJsonlSink
class TinyCNN(nn.Module):
def __init__(self) -> None:
super().__init__()
self.layers = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(8 * 4 * 4, 10),
)
def forward(self, features: torch.Tensor) -> torch.Tensor:
return self.layers(features)
torch.manual_seed(7)
digits = load_digits()
train_x, test_x, train_y, test_y = train_test_split(
digits.images, digits.target, test_size=0.2, stratify=digits.target, random_state=7
)
train_data = TensorDataset(
torch.tensor(train_x[:, None] / 16.0, dtype=torch.float32),
torch.tensor(train_y, dtype=torch.long),
)
loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_features = torch.tensor(test_x[:, None] / 16.0, dtype=torch.float32)
test_targets = torch.tensor(test_y, dtype=torch.long)
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "digits-cnn"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
model = TinyCNN()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()
with ctx.run("tiny-cnn", dataset_ref="dataset:sklearn.digits") as run:
with run.stage("train") as stage:
for epoch in range(1, 3):
total_loss = 0.0
for features, targets in loader:
optimizer.zero_grad()
loss = loss_fn(model(features), targets)
loss.backward()
optimizer.step()
total_loss += loss.item() * len(targets)
with stage.batch(f"epoch-{epoch}") as batch:
batch.metric("loss", total_loss / len(train_data))
with run.stage("evaluate") as stage:
with torch.no_grad():
logits = model(test_features)
accuracy = (logits.argmax(dim=1) == test_targets).float().mean().item()
stage.metric("accuracy", accuracy, unit="ratio")
with stage.sample("first-validation-image") as sample:
sample.metric(
"prediction.correct",
float(logits[0].argmax().item() == test_targets[0].item()),
unit="ratio",
)
checkpoint = workspace / "models" / "tiny-cnn.pt"
checkpoint.parent.mkdir(parents=True, exist_ok=True)
torch.save(model.state_dict(), checkpoint)
run.register_artifact("checkpoint", str(checkpoint), attributes={"epochs": 2})
with ctx.deployment("tiny-cnn-candidate", run_ref=run.ref) as deployment:
deployment.event("checkpoint.selected", message="Selected trained checkpoint for review")
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
print(f"Captured run: {run.ref}")
print(f"Measured validation accuracy: {accuracy:.3f}")
print(f"Records: {records_path.as_posix()}")
print(f"Checkpoint artifact: {checkpoint.as_posix()}")
"""Evaluate an OpenAI-shaped local mock API and capture response evidence."""
from pathlib import Path
from time import perf_counter
from types import SimpleNamespace
from contexta import Contexta
from contexta.capture import LocalJsonlSink
class MockCompletions:
def create(self, *, model: str, messages: list[dict[str, str]]) -> SimpleNamespace:
question = messages[-1]["content"]
if "workspace" in question.lower():
answer = "Contexta stores local evidence in a .contexta workspace."
else:
answer = "I cannot answer from the provided context."
return SimpleNamespace(
id=f"chatgpt-mock-{model}",
choices=[SimpleNamespace(message=SimpleNamespace(content=answer))],
usage=SimpleNamespace(completion_tokens=len(answer.split())),
)
class MockOpenAI:
def __init__(self) -> None:
self.chat = type("Chat", (), {"completions": MockCompletions()})()
cases = [
("workspace-question", "Where is the workspace?", ".contexta"),
("unsupported-question", "Which GPU was used?", "cannot answer"),
]
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "mock-openai-eval"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
client = MockOpenAI()
passed = 0
with ctx.run("mock-chat-evaluation", dataset_ref="dataset:local.prompt-cases") as run:
with run.stage("evaluate") as stage:
for name, question, expected in cases:
started = perf_counter()
response = client.chat.completions.create(
model="gpt-4.1-mini-mock",
messages=[{"role": "user", "content": question}],
)
answer = response.choices[0].message.content
correct = expected in answer
passed += int(correct)
with stage.sample(name) as sample:
sample.metric("correct", float(correct), unit="ratio")
sample.metric("latency.ms", (perf_counter() - started) * 1000, unit="ms")
sample.metric("completion.tokens", response.usage.completion_tokens)
sample.event("response.received", message=answer)
pass_rate = passed / len(cases)
stage.metric("pass.rate", pass_rate, unit="ratio")
with ctx.deployment("mock-chat-prompt", run_ref=run.ref) as deployment:
deployment.event("prompt.selected", message="Selected observed prompt flow for staging")
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
print(f"Captured run: {run.ref}")
print(f"Measured prompt-case pass rate: {pass_rate:.2f}")
print(f"Records: {records_path.as_posix()}")
Use runtime capture when you want:
- live instrumentation in application code
- scope-aware event and metric emission
- one product surface for lifecycle and capture behavior
The displayed programs are covered by automated tests, so their captured metrics remain tied to executable behavior.
When To Use Something Else
Stay with the facade when your goal is:
- inspect one run
- compare runs
- build reports
- diagnose problems
- trace lineage
Move to the advanced guide when you need:
- explicit config resolution
- direct store access
- backup or restore planning
Where To Go Next
Continue with: