Case 10: RAG Pipeline Stage-Level Decomposition
Persona: AI Engineer
Situation
A 4-stage RAG pipeline (retrieve → rerank → generate → evaluate) is in production.
End-to-end answer_quality has dropped from 0.87 in the baseline to 0.64 in the latest
version. Nobody knows which stage caused it. The team has four hypotheses:
- The retrieve stage is returning lower-precision documents.
- The rerank model has degraded on new query types.
- The generation model has drifted.
- It is measurement noise.
Without stage-level metrics, testing each hypothesis requires a separate investigation run.
Without Contexta
- End-to-end metric:
answer_quality = 0.64. That is the only number available. - Each hypothesis requires instrumenting a stage, re-running the pipeline, and checking whether the hypothesis-specific number changed.
- Four hypotheses = four investigation cycles = days of work.
With Contexta
# Metrics logged per stage at training time
# retrieve stage: retrieval-precision
# rerank stage: rerank-ndcg
# generate stage: generation-fluency
# evaluate stage: answer-quality
comparison = ctx.compare_runs(v1_ref, v2_ref)
for sc in comparison.stage_comparisons:
for d in sc.metric_deltas:
print(f"{sc.stage_name}/{d.metric_key}: {d.left_value:.3f} → {d.right_value:.3f}")
# Output:
# retrieve/retrieval-precision: 0.810 → 0.620 ← root cause
# rerank/rerank-ndcg: 0.870 → 0.790 ← cascade
# generate/generation-fluency: 0.880 → 0.760 ← cascade
# evaluate/answer-quality: 0.870 → 0.640 ← effect
Stage-scoped MetricRecord records make the root stage visible in one compare_runs call.
The cascading degradation pattern (retrieve → everything downstream) is immediately apparent.
Key APIs: Stage-scoped MetricRecord, compare_runs, diagnose_run
Complete Runnable Code
Run the seed script first, then the analysis script:
uv run examples/case_studies/case10_seed_rag_decomposition_data.py
uv run examples/case_studies/case10_analyze_rag_decomposition.py
case10_seed_rag_decomposition_data.py
"""Create RAG-stage decomposition records used by the RAG case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "knowledge-base-rag"
RAG_STAGES = ["retrieve", "rerank", "generate", "evaluate"]
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
),
)
)
def _build_rag_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
stage_metrics: dict[str, dict[str, float]],
emit_retrieval_warning: bool = False,
) -> str:
"""Create a RAG pipeline run with per-stage metrics."""
run_ref = f"run:{project_name}.{run_name}"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)
# Build 4 stages with staggered timestamps
date_str = started_at[:10]
stage_time_slots = [
(f"{date_str}T09:00:00Z", f"{date_str}T09:10:00Z"),
(f"{date_str}T09:10:00Z", f"{date_str}T09:20:00Z"),
(f"{date_str}T09:20:00Z", f"{date_str}T09:40:00Z"),
(f"{date_str}T09:40:00Z", f"{date_str}T09:50:00Z"),
]
stage_refs: dict[str, str] = {}
for idx, stage_name in enumerate(RAG_STAGES):
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
slot_start, slot_end = stage_time_slots[idx]
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status="completed",
started_at=slot_start,
ended_at=slot_end,
order_index=idx,
)
)
stage_refs[stage_name] = stage_ref
# Emit metrics per stage
for stage_name, metrics in stage_metrics.items():
stage_ref = stage_refs[stage_name]
ts = stage_time_slots[RAG_STAGES.index(stage_name)][1]
for key, val in metrics.items():
_emit_metric(record_store, project_name, run_name, run_ref, stage_ref, key, val, ts)
# Optionally emit a degradation warning on the retrieve stage
if emit_retrieval_warning:
retrieve_ref = stage_refs["retrieve"]
ts = stage_time_slots[0][1]
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="rag.retrieval_precision_drop",
category="verification",
severity="warning",
summary=(
"Retrieval precision dropped below threshold (0.55 < 0.70). "
"Index may contain stale or out-of-distribution documents. "
"Downstream rerank and generate stages affected."
),
origin_marker="explicit_capture",
attributes={
"retrieval-precision": 0.55,
"threshold": 0.70,
"top-k": 5,
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="rag.retrieval-warning",
level="warning",
message="Retrieval precision below threshold. Cascading quality issues expected.",
origin_marker="explicit_capture",
),
)
)
return run_ref
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create v1 baseline and v2 degraded RAG runs."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case10-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Knowledge-base RAG pipeline (4 stages)",
)
)
# v1: balanced quality across all stages
v1_stage_metrics = {
"retrieve": {"retrieval-precision": 0.82, "retrieval-recall": 0.79},
"rerank": {"rerank-ndcg": 0.81, "rerank-mrr": 0.77},
"generate": {"generation-fluency": 0.88, "generation-coherence": 0.85},
"evaluate": {"answer-quality": 0.87, "faithfulness": 0.84},
}
v1_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v1-baseline",
started_at="2025-04-01T09:00:00Z",
ended_at="2025-04-01T09:50:00Z",
stage_metrics=v1_stage_metrics,
emit_retrieval_warning=False,
)
# v2: retrieval precision drops -> cascading degradation downstream
v2_stage_metrics = {
"retrieve": {"retrieval-precision": 0.55, "retrieval-recall": 0.51},
"rerank": {"rerank-ndcg": 0.61, "rerank-mrr": 0.57},
"generate": {"generation-fluency": 0.76, "generation-coherence": 0.68},
"evaluate": {"answer-quality": 0.64, "faithfulness": 0.59},
}
v2_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v2-degraded",
started_at="2025-04-15T09:00:00Z",
ended_at="2025-04-15T09:50:00Z",
stage_metrics=v2_stage_metrics,
emit_retrieval_warning=True,
)
return {
"v1_run_id": v1_ref,
"v2_run_id": v2_ref,
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
case10_analyze_rag_decomposition.py
"""Compare previously recorded RAG runs by stage."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "knowledge-base-rag"
BASELINE = f"run:{PROJECT_NAME}.rag-v1-baseline"
DEGRADED = f"run:{PROJECT_NAME}.rag-v2-degraded"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
comparison = ctx.compare_runs(BASELINE, DEGRADED)
print("Stage-by-stage deltas:")
for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
diagnosis = ctx.diagnose_run(DEGRADED)
print("\nDiagnostics:")
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")
finally:
store.close()