Skip to main content

Batch, Sample & Deployment Tracking

This page explains Contexta's three additional execution context types: Batch, Sample, and Deployment.

These types extend the core Run → Stage hierarchy for workflows that involve repeated data processing, per-item observation, or model deployment tracking.

Batch

A batch execution represents one discrete unit of data processing within a stage.

Typical uses:

  • one epoch in a training loop
  • one chunk in a streaming pipeline
  • one file in a batch-import workflow

Batch executions are owned by a stage:

Run → Stage → Batch

Ref format

batch:{project}.{run}.{stage}.{batch_name}

Example: batch:my-proj.run-01.train.epoch-0

Status values

open | completed | failed | cancelled

completed and failed require ended_at.

Logging a batch

from contexta.contract import BatchExecution

batch = BatchExecution(
batch_execution_ref="batch:my-proj.run-01.train.epoch-0",
run_ref="run:my-proj.run-01",
stage_execution_ref="stage:my-proj.run-01.train",
batch_name="epoch-0",
status="completed",
started_at="2025-01-01T00:01:00Z",
ended_at="2025-01-01T00:02:00Z",
order_index=0,
)
ctx.metadata_store.batches.put_batch_execution(batch)

Querying batches

batches = ctx.list_batches("run:my-proj.run-01")
for b in batches:
print(b.name, b.status, b.started_at)

Sample

A sample observation records one item seen during a stage or batch.

Typical uses:

  • one input row in a validation pass
  • one image in a dataset scan
  • one prediction in an inference batch

Samples are owned by a stage. The ref must encode the parent stage name and the sample name as the fourth component:

Ref format

sample:{project}.{run}.{stage}.{sample_name}

Example: sample:my-proj.run-01.train.s-0001

Note: the 4-component constraint means the sample name must not contain dots.

Logging a sample

from contexta.contract import SampleObservation

sample = SampleObservation(
sample_observation_ref="sample:my-proj.run-01.train.s-0001",
run_ref="run:my-proj.run-01",
stage_execution_ref="stage:my-proj.run-01.train",
sample_name="s-0001",
observed_at="2025-01-01T00:01:30Z",
)
ctx.metadata_store.samples.put_sample_observation(sample)

Querying samples

samples = ctx.list_samples("run:my-proj.run-01")
for s in samples:
print(s.name, s.observed_at)

Deployment

A deployment execution tracks one instance of a model or artifact being deployed to an environment.

Typical uses:

  • a model pushed to a serving endpoint
  • a checkpoint promoted to staging
  • a trained artifact registered in a model registry

Deployments are scoped to a project and can optionally link to the run that produced the deployed artifact:

Project → Deployment (→ Run, optional)

Ref format

deployment:{project}.{deployment_name}

Example: deployment:my-proj.model-v1

Logging a deployment

from contexta.contract import DeploymentExecution

deploy = DeploymentExecution(
deployment_execution_ref="deployment:my-proj.model-v1",
project_ref="project:my-proj",
deployment_name="model-v1",
status="completed",
started_at="2025-01-01T00:09:00Z",
ended_at="2025-01-01T00:10:00Z",
run_ref="run:my-proj.run-01", # optional link to the producing run
)
ctx.metadata_store.deployments.put_deployment_execution(deploy)

Querying deployments

deployments = ctx.list_deployments("my-proj")
for d in deployments:
print(d.name, d.status, d.run_id)

In snapshot reports

When you call ctx.build_snapshot_report(run_ref), the report automatically includes Batches, Deployments, and Samples sections when data is present.

report = ctx.build_snapshot_report("run:my-proj.run-01")
for section in report.sections:
print(section.title)
# → Run Summary, Stages, Artifacts, Batches, Deployments, Samples, Diagnostics, ...

Diagnostics

The DiagnosticsService checks batch and deployment health automatically:

ConditionSeverityIssue key
BatchExecution.status == "failed"errorfailed_batch
BatchExecution in non-terminal statuswarningincomplete_batch
DeploymentExecution.status == "failed"errorfailed_deployment

These issues appear in the Diagnostics section of the snapshot report.


Complete Workflow Examples

Choose a tab below and save its displayed code as contexta_tracking.py. Install the optional dependency for the selected domain, then run that local file:

uv add contexta
# Add only when required by the selected tab:
uv add "contexta[sklearn]" # Machine Learning
uv add "contexta[torch]" # Deep Learning
uv run contexta_tracking.py
"""Train two real SVM candidates and compare their captured evaluation results."""

import pickle
from pathlib import Path

from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

from contexta import Contexta
from contexta.capture import LocalJsonlSink


features, targets = load_iris(return_X_y=True)
train_x, test_x, train_y, test_y = train_test_split(
features, targets, test_size=0.3, stratify=targets, random_state=7
)
candidates = {
"linear-svm": SVC(kernel="linear"),
"rbf-svm": SVC(kernel="rbf", gamma="scale"),
}
workspace = Path(".contexta")
ctx = Contexta(workspace=str(workspace), config={"project_name": "iris-svm"})
local_sink = next(sink for sink in ctx.sinks if isinstance(sink, LocalJsonlSink))
scores = {}
run_refs = {}

for name, estimator in candidates.items():
with ctx.run(name, dataset_ref="dataset:sklearn.iris") as run:
with run.stage("train"):
model = make_pipeline(StandardScaler(), estimator)
model.fit(train_x, train_y)

with run.stage("evaluate") as stage:
predictions = model.predict(test_x)
accuracy = accuracy_score(test_y, predictions)
macro_f1 = f1_score(test_y, predictions, average="macro")
with stage.batch("holdout-split") as batch:
batch.metric("accuracy", accuracy, unit="ratio")
batch.metric("macro.f1", macro_f1, unit="ratio")
with batch.sample("first-prediction") as sample:
sample.metric("correct", float(predictions[0] == test_y[0]), unit="ratio")

model_path = workspace / "models" / f"{name}.pkl"
model_path.parent.mkdir(parents=True, exist_ok=True)
model_path.write_bytes(pickle.dumps(model))
run.register_artifact("model", str(model_path), attributes={"candidate": name})
scores[name] = accuracy
run_refs[name] = run.ref

best_name = max(scores, key=scores.get)
delta = scores["rbf-svm"] - scores["linear-svm"]
records_path = local_sink.file_path_for("RECORD").relative_to(Path.cwd())
artifacts_path = local_sink.file_path_for("ARTIFACT").relative_to(Path.cwd())

print(f"Compared runs: {run_refs['linear-svm']} vs {run_refs['rbf-svm']}")
print(f"Accuracy: {scores['linear-svm']:.3f} -> {scores['rbf-svm']:.3f}")
print(f"Delta: {delta:+.3f}")
print(f"Selected run: {run_refs[best_name]}")
print(f"Records: {records_path.as_posix()}")
print(f"Artifacts: {artifacts_path.as_posix()}")