Batch, Sample & Deployment Tracking
This page explains Contexta's three additional execution context types: Batch, Sample, and Deployment.
These types extend the core Run → Stage hierarchy for workflows that involve repeated data processing, per-item observation, or model deployment tracking.
Batch
A batch execution represents one discrete unit of data processing within a stage.
Typical uses:
- one epoch in a training loop
- one chunk in a streaming pipeline
- one file in a batch-import workflow
Batch executions are owned by a stage:
Run → Stage → Batch
Ref format
batch:{project}.{run}.{stage}.{batch_name}
Example: batch:my-proj.run-01.train.epoch-0
Status values
open | completed | failed | cancelled
completed and failed require ended_at.
Logging a batch
from contexta.contract import BatchExecution
batch = BatchExecution(
batch_execution_ref="batch:my-proj.run-01.train.epoch-0",
run_ref="run:my-proj.run-01",
stage_execution_ref="stage:my-proj.run-01.train",
batch_name="epoch-0",
status="completed",
started_at="2025-01-01T00:01:00Z",
ended_at="2025-01-01T00:02:00Z",
order_index=0,
)
ctx.metadata_store.batches.put_batch_execution(batch)
Querying batches
batches = ctx.list_batches("run:my-proj.run-01")
for b in batches:
print(b.name, b.status, b.started_at)
Sample
A sample observation records one item seen during a stage or batch.
Typical uses:
- one input row in a validation pass
- one image in a dataset scan
- one prediction in an inference batch
Samples are owned by a stage. The ref must encode the parent stage name and the sample name as the fourth component:
Ref format
sample:{project}.{run}.{stage}.{sample_name}
Example: sample:my-proj.run-01.train.s-0001
Note: the 4-component constraint means the sample name must not contain dots.
Logging a sample
from contexta.contract import SampleObservation
sample = SampleObservation(
sample_observation_ref="sample:my-proj.run-01.train.s-0001",
run_ref="run:my-proj.run-01",
stage_execution_ref="stage:my-proj.run-01.train",
sample_name="s-0001",
observed_at="2025-01-01T00:01:30Z",
)
ctx.metadata_store.samples.put_sample_observation(sample)
Querying samples
samples = ctx.list_samples("run:my-proj.run-01")
for s in samples:
print(s.name, s.observed_at)
Deployment
A deployment execution tracks one instance of a model or artifact being deployed to an environment.
Typical uses:
- a model pushed to a serving endpoint
- a checkpoint promoted to staging
- a trained artifact registered in a model registry
Deployments are scoped to a project and can optionally link to the run that produced the deployed artifact:
Project → Deployment (→ Run, optional)
Ref format
deployment:{project}.{deployment_name}
Example: deployment:my-proj.model-v1
Logging a deployment
from contexta.contract import DeploymentExecution
deploy = DeploymentExecution(
deployment_execution_ref="deployment:my-proj.model-v1",
project_ref="project:my-proj",
deployment_name="model-v1",
status="completed",
started_at="2025-01-01T00:09:00Z",
ended_at="2025-01-01T00:10:00Z",
run_ref="run:my-proj.run-01", # optional link to the producing run
)
ctx.metadata_store.deployments.put_deployment_execution(deploy)
Querying deployments
deployments = ctx.list_deployments("my-proj")
for d in deployments:
print(d.name, d.status, d.run_id)
In snapshot reports
When you call ctx.build_snapshot_report(run_ref), the report automatically
includes Batches, Deployments, and Samples sections when data is present.
report = ctx.build_snapshot_report("run:my-proj.run-01")
for section in report.sections:
print(section.title)
# → Run Summary, Stages, Artifacts, Batches, Deployments, Samples, Diagnostics, ...
Diagnostics
The DiagnosticsService checks batch and deployment health automatically:
| Condition | Severity | Issue key |
|---|---|---|
BatchExecution.status == "failed" | error | failed_batch |
BatchExecution in non-terminal status | warning | incomplete_batch |
DeploymentExecution.status == "failed" | error | failed_deployment |
These issues appear in the Diagnostics section of the snapshot report.
Complete Workflow Examples
Choose a tab below and save its displayed code as contexta_tracking.py.
Install the optional dependency for the selected domain, then run that local
file:
uv add contexta
# Add only when required by the selected tab:
uv add "contexta[sklearn]" # Machine Learning
uv add "contexta[torch]" # Deep Learning
uv run contexta_tracking.py
- Machine Learning
- Deep Learning
- LLM
"""Train two real SVM candidates and compare their captured evaluation results."""
import pickle
from pathlib import Path
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from contexta import Contexta
from contexta.capture import LocalJsonlSink
features, targets = load_iris(return_X_y=True)
train_x, test_x, train_y, test_y = train_test_split(
features, targets, test_size=0.3, stratify=targets, random_state=7
)
candidates = {
"linear-svm": SVC(kernel="linear"),
"rbf-svm": SVC(kernel="rbf", gamma="scale"),
}
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "iris-svm"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
scores = {}
run_refs = {}
for name, estimator in candidates.items():
with ctx.run(name, dataset_ref="dataset:sklearn.iris") as run:
with run.stage("train"):
model = make_pipeline(StandardScaler(), estimator)
model.fit(train_x, train_y)
with run.stage("evaluate") as stage:
predictions = model.predict(test_x)
accuracy = accuracy_score(test_y, predictions)
macro_f1 = f1_score(test_y, predictions, average="macro")
with stage.batch("holdout-split") as batch:
batch.metric("accuracy", accuracy, unit="ratio")
batch.metric("macro.f1", macro_f1, unit="ratio")
with batch.sample("first-prediction") as sample:
sample.metric("correct", float(predictions[0] == test_y[0]), unit="ratio")
model_path = workspace / "models" / f"{name}.pkl"
model_path.parent.mkdir(parents=True, exist_ok=True)
model_path.write_bytes(pickle.dumps(model))
run.register_artifact("model", str(model_path), attributes={"candidate": name})
scores[name] = accuracy
run_refs[name] = run.ref
best_name = max(scores, key=scores.get)
delta = scores["rbf-svm"] - scores["linear-svm"]
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
artifacts_path = local_sink.file_path_for("ARTIFACT").relative_to(Path.cwd())
print(f"Compared runs: {run_refs['linear-svm']} vs {run_refs['rbf-svm']}")
print(f"Accuracy: {scores['linear-svm']:.3f} -> {scores['rbf-svm']:.3f}")
print(f"Delta: {delta:+.3f}")
print(f"Selected run: {run_refs[best_name]}")
print(f"Records: {records_path.as_posix()}")
print(f"Artifacts: {artifacts_path.as_posix()}")
"""Train a tiny CNN and capture epoch, evaluation, and checkpoint evidence."""
from pathlib import Path
import torch
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from contexta import Contexta
from contexta.capture import LocalJsonlSink
class TinyCNN(nn.Module):
def __init__(self) -> None:
super().__init__()
self.layers = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Flatten(),
nn.Linear(8 * 4 * 4, 10),
)
def forward(self, features: torch.Tensor) -> torch.Tensor:
return self.layers(features)
torch.manual_seed(7)
digits = load_digits()
train_x, test_x, train_y, test_y = train_test_split(
digits.images, digits.target, test_size=0.2, stratify=digits.target, random_state=7
)
train_data = TensorDataset(
torch.tensor(train_x[:, None] / 16.0, dtype=torch.float32),
torch.tensor(train_y, dtype=torch.long),
)
loader = DataLoader(train_data, batch_size=64, shuffle=True)
test_features = torch.tensor(test_x[:, None] / 16.0, dtype=torch.float32)
test_targets = torch.tensor(test_y, dtype=torch.long)
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "digits-cnn"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
model = TinyCNN()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()
with ctx.run("tiny-cnn", dataset_ref="dataset:sklearn.digits") as run:
with run.stage("train") as stage:
for epoch in range(1, 3):
total_loss = 0.0
for features, targets in loader:
optimizer.zero_grad()
loss = loss_fn(model(features), targets)
loss.backward()
optimizer.step()
total_loss += loss.item() * len(targets)
with stage.batch(f"epoch-{epoch}") as batch:
batch.metric("loss", total_loss / len(train_data))
with run.stage("evaluate") as stage:
with torch.no_grad():
logits = model(test_features)
accuracy = (logits.argmax(dim=1) == test_targets).float().mean().item()
stage.metric("accuracy", accuracy, unit="ratio")
with stage.sample("first-validation-image") as sample:
sample.metric(
"prediction.correct",
float(logits[0].argmax().item() == test_targets[0].item()),
unit="ratio",
)
checkpoint = workspace / "models" / "tiny-cnn.pt"
checkpoint.parent.mkdir(parents=True, exist_ok=True)
torch.save(model.state_dict(), checkpoint)
run.register_artifact("checkpoint", str(checkpoint), attributes={"epochs": 2})
with ctx.deployment("tiny-cnn-candidate", run_ref=run.ref) as deployment:
deployment.event("checkpoint.selected", message="Selected trained checkpoint for review")
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
print(f"Captured run: {run.ref}")
print(f"Measured validation accuracy: {accuracy:.3f}")
print(f"Records: {records_path.as_posix()}")
print(f"Checkpoint artifact: {checkpoint.as_posix()}")
"""Evaluate an OpenAI-shaped local mock API and capture response evidence."""
from pathlib import Path
from time import perf_counter
from types import SimpleNamespace
from contexta import Contexta
from contexta.capture import LocalJsonlSink
class MockCompletions:
def create(self, *, model: str, messages: list[dict[str, str]]) -> SimpleNamespace:
question = messages[-1]["content"]
if "workspace" in question.lower():
answer = "Contexta stores local evidence in a .contexta workspace."
else:
answer = "I cannot answer from the provided context."
return SimpleNamespace(
id=f"chatgpt-mock-{model}",
choices=[SimpleNamespace(message=SimpleNamespace(content=answer))],
usage=SimpleNamespace(completion_tokens=len(answer.split())),
)
class MockOpenAI:
def __init__(self) -> None:
self.chat = type("Chat", (), {"completions": MockCompletions()})()
cases = [
("workspace-question", "Where is the workspace?", ".contexta"),
("unsupported-question", "Which GPU was used?", "cannot answer"),
]
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "mock-openai-eval"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
client = MockOpenAI()
passed = 0
with ctx.run("mock-chat-evaluation", dataset_ref="dataset:local.prompt-cases") as run:
with run.stage("evaluate") as stage:
for name, question, expected in cases:
started = perf_counter()
response = client.chat.completions.create(
model="gpt-4.1-mini-mock",
messages=[{"role": "user", "content": question}],
)
answer = response.choices[0].message.content
correct = expected in answer
passed += int(correct)
with stage.sample(name) as sample:
sample.metric("correct", float(correct), unit="ratio")
sample.metric("latency.ms", (perf_counter() - started) * 1000, unit="ms")
sample.metric("completion.tokens", response.usage.completion_tokens)
sample.event("response.received", message=answer)
pass_rate = passed / len(cases)
stage.metric("pass.rate", pass_rate, unit="ratio")
with ctx.deployment("mock-chat-prompt", run_ref=run.ref) as deployment:
deployment.event("prompt.selected", message="Selected observed prompt flow for staging")
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
print(f"Captured run: {run.ref}")
print(f"Measured prompt-case pass rate: {pass_rate:.2f}")
print(f"Records: {records_path.as_posix()}")