Skip to main content

Case 10: RAG Pipeline Stage-Level Decomposition

Persona: AI Engineer

Situation

A 4-stage RAG pipeline (retrieve → rerank → generate → evaluate) is in production. End-to-end answer_quality has dropped from 0.87 in the baseline to 0.64 in the latest version. Nobody knows which stage caused it. The team has four hypotheses:

  • The retrieve stage is returning lower-precision documents.
  • The rerank model has degraded on new query types.
  • The generation model has drifted.
  • It is measurement noise.

Without stage-level metrics, testing each hypothesis requires a separate investigation run.

Without Contexta

  • End-to-end metric: answer_quality = 0.64. That is the only number available.
  • Each hypothesis requires instrumenting a stage, re-running the pipeline, and checking whether the hypothesis-specific number changed.
  • Four hypotheses = four investigation cycles = days of work.

With Contexta

# Metrics logged per stage at training time
# retrieve stage: retrieval-precision
# rerank stage: rerank-ndcg
# generate stage: generation-fluency
# evaluate stage: answer-quality

comparison = ctx.compare_runs(v1_ref, v2_ref)
for sc in comparison.stage_comparisons:
for d in sc.metric_deltas:
print(f"{sc.stage_name}/{d.metric_key}: {d.left_value:.3f}{d.right_value:.3f}")

# Output:
# retrieve/retrieval-precision: 0.810 → 0.620 ← root cause
# rerank/rerank-ndcg: 0.870 → 0.790 ← cascade
# generate/generation-fluency: 0.880 → 0.760 ← cascade
# evaluate/answer-quality: 0.870 → 0.640 ← effect

Stage-scoped MetricRecord records make the root stage visible in one compare_runs call. The cascading degradation pattern (retrieve → everything downstream) is immediately apparent.

Key APIs: Stage-scoped MetricRecord, compare_runs, diagnose_run


Complete Runnable Code

Run the seed script first, then the analysis script:

uv run examples/case_studies/case10_seed_rag_decomposition_data.py
uv run examples/case_studies/case10_analyze_rag_decomposition.py
case10_seed_rag_decomposition_data.py
"""Create RAG-stage decomposition records used by the RAG case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "knowledge-base-rag"

RAG_STAGES = ["retrieve", "rerank", "generate", "evaluate"]

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
),
)
)


def _build_rag_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
stage_metrics: dict[str, dict[str, float]],
emit_retrieval_warning: bool = False,
) -> str:
"""Create a RAG pipeline run with per-stage metrics."""
run_ref = f"run:{project_name}.{run_name}"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)

# Build 4 stages with staggered timestamps
date_str = started_at[:10]
stage_time_slots = [
(f"{date_str}T09:00:00Z", f"{date_str}T09:10:00Z"),
(f"{date_str}T09:10:00Z", f"{date_str}T09:20:00Z"),
(f"{date_str}T09:20:00Z", f"{date_str}T09:40:00Z"),
(f"{date_str}T09:40:00Z", f"{date_str}T09:50:00Z"),
]

stage_refs: dict[str, str] = {}
for idx, stage_name in enumerate(RAG_STAGES):
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
slot_start, slot_end = stage_time_slots[idx]
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status="completed",
started_at=slot_start,
ended_at=slot_end,
order_index=idx,
)
)
stage_refs[stage_name] = stage_ref

# Emit metrics per stage
for stage_name, metrics in stage_metrics.items():
stage_ref = stage_refs[stage_name]
ts = stage_time_slots[RAG_STAGES.index(stage_name)][1]
for key, val in metrics.items():
_emit_metric(record_store, project_name, run_name, run_ref, stage_ref, key, val, ts)

# Optionally emit a degradation warning on the retrieve stage
if emit_retrieval_warning:
retrieve_ref = stage_refs["retrieve"]
ts = stage_time_slots[0][1]
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="rag.retrieval_precision_drop",
category="verification",
severity="warning",
summary=(
"Retrieval precision dropped below threshold (0.55 < 0.70). "
"Index may contain stale or out-of-distribution documents. "
"Downstream rerank and generate stages affected."
),
origin_marker="explicit_capture",
attributes={
"retrieval-precision": 0.55,
"threshold": 0.70,
"top-k": 5,
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="rag.retrieval-warning",
level="warning",
message="Retrieval precision below threshold. Cascading quality issues expected.",
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create v1 baseline and v2 degraded RAG runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case10-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Knowledge-base RAG pipeline (4 stages)",
)
)

# v1: balanced quality across all stages
v1_stage_metrics = {
"retrieve": {"retrieval-precision": 0.82, "retrieval-recall": 0.79},
"rerank": {"rerank-ndcg": 0.81, "rerank-mrr": 0.77},
"generate": {"generation-fluency": 0.88, "generation-coherence": 0.85},
"evaluate": {"answer-quality": 0.87, "faithfulness": 0.84},
}
v1_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v1-baseline",
started_at="2025-04-01T09:00:00Z",
ended_at="2025-04-01T09:50:00Z",
stage_metrics=v1_stage_metrics,
emit_retrieval_warning=False,
)

# v2: retrieval precision drops -> cascading degradation downstream
v2_stage_metrics = {
"retrieve": {"retrieval-precision": 0.55, "retrieval-recall": 0.51},
"rerank": {"rerank-ndcg": 0.61, "rerank-mrr": 0.57},
"generate": {"generation-fluency": 0.76, "generation-coherence": 0.68},
"evaluate": {"answer-quality": 0.64, "faithfulness": 0.59},
}
v2_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v2-degraded",
started_at="2025-04-15T09:00:00Z",
ended_at="2025-04-15T09:50:00Z",
stage_metrics=v2_stage_metrics,
emit_retrieval_warning=True,
)

return {
"v1_run_id": v1_ref,
"v2_run_id": v2_ref,
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()
case10_analyze_rag_decomposition.py
"""Compare previously recorded RAG runs by stage."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "knowledge-base-rag"
BASELINE = f"run:{PROJECT_NAME}.rag-v1-baseline"
DEGRADED = f"run:{PROJECT_NAME}.rag-v2-degraded"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
comparison = ctx.compare_runs(BASELINE, DEGRADED)
print("Stage-by-stage deltas:")
for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")

diagnosis = ctx.diagnose_run(DEGRADED)
print("\nDiagnostics:")
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")
finally:
store.close()