Case 09: Mia's Per-Prompt Evaluation
Persona: Mia, AI Engineer
Situation
Mia evaluates a RAG pipeline for a customer-support chatbot. She runs 20 test prompts across four categories (account questions, billing disputes, product inquiries, escalation scenarios) and scores each on relevance, faithfulness, and answer length. Prompts 7, 12, and 17 are escalation scenarios where the retriever completely fails to find relevant documents — they return near-zero relevance scores.
The problem: the aggregate relevance is 0.79. That passes the quality gate. The 3 broken prompts are invisible in the aggregate. When those prompts reach production, the chatbot returns off-topic answers to customers trying to escalate.
Without Contexta
- Aggregate metrics are the only available signal:
mean_relevance = 0.79. - Detecting which specific prompts are broken requires writing custom analysis code against the raw eval output, which is usually a JSON file or a dataframe.
- As the eval suite grows, the custom analysis code grows with it — and rarely gets maintained.
With Contexta
# Each prompt is a SampleObservation; metrics are per-sample
for i, prompt in enumerate(PROMPTS):
sample_ref = f"sample:{PROJECT}.run01.evaluate.prompt-{i+1:02d}"
store.samples.put_sample_observation(SampleObservation(
sample_observation_ref=sample_ref, ...
))
record_store.append(MetricRecord(..., payload=MetricPayload(
metric_key="relevance", value=scores[i]["relevance"]
)))
# Analysis: find failing prompts
snapshot = ctx.get_run_snapshot(run_ref)
for rec in snapshot.records:
if rec.record_type == "metric" and rec.key == "relevance" and rec.value < 0.3:
print(f"FAIL: {rec.stage_id} / {rec.sample_id}")
Per-sample MetricRecord records mean the full distribution is queryable, not just the
aggregate. The three failing prompts are found by a simple filter — no custom analysis code.
Key APIs: SampleObservation, MetricRecord (per-sample), get_run_snapshot
Complete Runnable Code
Run the seed script first, then the analysis script:
uv run examples/case_studies/case09_seed_prompt_evaluation_data.py
uv run examples/case_studies/case09_analyze_prompt_evaluation.py
"""Create per-prompt evaluation records used by the prompt case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
SampleObservation,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "support-chatbot-rag-eval"
STAGE_NAME = "evaluate"
NUM_PROMPTS = 20
# Prompts that have bad relevance (1-based index matching sample names)
FAILING_PROMPT_INDICES = {7, 12, 17}
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _prompt_metrics(idx: int) -> tuple[float, float, int]:
"""Return (relevance, faithfulness, answer_length) for prompt index (1-based)."""
if idx in FAILING_PROMPT_INDICES:
# Escalation scenarios -- retriever completely misses
relevance = round(0.05 + (idx % 3) * 0.07, 3)
faithfulness = round(0.12 + (idx % 5) * 0.04, 3)
answer_length = 18 + (idx % 4) * 3
else:
# Normal prompts
relevance = round(0.78 + (idx % 7) * 0.02 + (idx % 3) * 0.01, 3)
faithfulness = round(0.81 + (idx % 5) * 0.02, 3)
answer_length = 45 + (idx % 8) * 5
return relevance, faithfulness, answer_length
def _category(idx: int) -> str:
categories = ["account", "billing", "product", "escalation"]
return categories[(idx - 1) % 4]
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create 1 run with 20 per-prompt SampleObservations."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case09-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-10T00:00:00Z",
description="RAG customer support chatbot evaluation suite",
)
)
run_name = "eval-run-v1"
run_ref = f"run:{PROJECT_NAME}.{run_name}"
started_at = "2025-04-10T10:00:00Z"
ended_at = "2025-04-10T10:30:00Z"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{PROJECT_NAME}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)
stage_ref = f"stage:{PROJECT_NAME}.{run_name}.{STAGE_NAME}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=STAGE_NAME,
status="completed",
started_at=started_at,
ended_at=ended_at,
order_index=0,
)
)
# Register a structured event describing the evaluation suite
record_store = ctx.record_store
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{PROJECT_NAME}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=started_at,
observed_at=started_at,
producer_ref="contexta.case09",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="eval.suite-registered",
level="info",
message=f"Evaluation suite: {NUM_PROMPTS} prompts across 4 categories.",
attributes={"prompt_count": NUM_PROMPTS, "categories": "account,billing,product,escalation"},
origin_marker="explicit_capture",
),
)
)
# Create 20 SampleObservations (one per prompt) with metric records
for idx in range(1, NUM_PROMPTS + 1):
sample_name = f"prompt-{idx:02d}"
# sample_observation_ref must equal stage_execution_ref + "." + sample_name
sample_ref = f"sample:{PROJECT_NAME}.{run_name}.{STAGE_NAME}.{sample_name}"
obs_ts = f"2025-04-10T10:{idx:02d}:00Z"
store.samples.put_sample_observation(
SampleObservation(
sample_observation_ref=sample_ref,
run_ref=run_ref,
stage_execution_ref=stage_ref,
sample_name=sample_name,
observed_at=obs_ts,
)
)
relevance, faithfulness, answer_length = _prompt_metrics(idx)
for metric_key, metric_val in [
("relevance", relevance),
("faithfulness", faithfulness),
("answer-length", float(answer_length)),
]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{PROJECT_NAME}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case09",
run_ref=run_ref,
stage_execution_ref=stage_ref,
sample_observation_ref=sample_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=metric_key,
value=metric_val,
value_type="float64",
),
)
)
return {
"run_id": run_ref,
"total_prompts": NUM_PROMPTS,
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
"""Find failed prompts from previously recorded sample-level metrics."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "support-chatbot-rag-eval"
RUN_REF = f"run:{PROJECT_NAME}.eval-run-v1"
CATEGORIES = ["account", "billing", "product", "escalation"]
def prompt_category(sample_name: str) -> str:
prompt_index = int(sample_name.removeprefix("prompt-"))
return CATEGORIES[(prompt_index - 1) % len(CATEGORIES)]
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
snapshot = ctx.get_run_snapshot(RUN_REF)
samples_by_time = {sample.observed_at: sample for sample in snapshot.samples}
relevance = [
record
for record in snapshot.records
if record.record_type == "metric" and record.key == "relevance"
]
mean_relevance = sum(float(record.value) for record in relevance) / len(relevance)
failures = [record for record in relevance if float(record.value) < 0.3]
print(f"Mean relevance: {mean_relevance:.3f}")
print("Failed prompts:")
for record in failures:
sample = samples_by_time[record.observed_at]
print(
f" {sample.name} ({prompt_category(sample.name)}): "
f"relevance={record.value:.3f}, record={record.record_id}"
)
finally:
store.close()