본문으로 건너뛰기

Case 10 : RAG의 어느 파트가 문제일까요?


페르소나


Noah는 회사의 지식 베이스를 사용하여 RAG 파이프라인을 운영하는 AI 엔지니어입니다.

파이프라인은 retrieve -> rerank -> generate -> evaluate 네 단계로 구성됩니다.

최신 버전의 종단 간 answer_quality0.87에서 0.64로 떨어졌지만, Noah의 팀은 어느 단계가 원인인지 모릅니다.


상황


팀에는 여러 가설이 있습니다.

  • retrieve 단계가 낮은 정밀도의 문서를 가져왔을 수 있습니다.
  • rerank 모델이 새 쿼리 유형에서 저하됐을 수 있습니다.
  • generation 모델이 바뀌었을 수 있습니다.
  • 단순 측정 노이즈일 수 있습니다.

이 케이스의 핵심은 종단 간 점수 하나가 아니라 단계 별 메트릭을 저장해, 성능 저하의 시작점을 찾는 것입니다.


Contexta 없이 해결하려면


Contexta 없이 원인을 찾으려면 보통 아래 작업을 직접 해야 합니다.

  1. retrieve, rerank, generate 단계에 새 계측 코드를 추가합니다.
  2. 기준선과 최신 버전을 다시 실행합니다.
  3. 단계별 메트릭을 별도 파일로 모읍니다.
  4. 각 가설별로 수치를 비교합니다.
  5. 어떤 저하가 원인이고 어떤 저하가 연쇄 영향인지 해석합니다.

종단 간 answer_quality=0.64만 있으면 네 가지 가설을 각각 조사해야 합니다.


Contexta로 해결하기


각 단계는 자신에게 맞는 메트릭을 남깁니다.

retrieve retrieval-precision
rerank rerank-ndcg
generate generation-fluency
evaluate answer-quality

이 해결 흐름은 세 단계입니다.

단계Contexta API얻는 정보
스테이지별 기록단계 스코프 MetricRecord각 단계의 품질 지표
실행 비교ctx.compare_runs(BASELINE, DEGRADED)기준선 대비 단계별 변화량
실행 진단ctx.diagnose_run(DEGRADED)저하 기록과 진단 이슈

예제 코드


아래 코드는 데이터 준비 단계에서 만든 기준선 RAG 실행과 저하된 RAG 실행을 읽고, 단계 별 메트릭 차이를 비교합니다.

analyze_rag_decomposition.py
"""Compare previously recorded RAG runs by stage."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "knowledge-base-rag"
BASELINE = f"run:{PROJECT_NAME}.rag-v1-baseline"
DEGRADED = f"run:{PROJECT_NAME}.rag-v2-degraded"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
comparison = ctx.compare_runs(BASELINE, DEGRADED)
print("Stage-by-stage deltas:")
for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")

diagnosis = ctx.diagnose_run(DEGRADED)
print("\nDiagnostics:")
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")
finally:
store.close()

실행하면 다음과 같은 결과를 얻습니다.

Stage-by-stage deltas:
evaluate/answer-quality: -0.2300
evaluate/faithfulness: -0.2500
generate/generation-coherence: -0.1700
generate/generation-fluency: -0.1200
rerank/rerank-mrr: -0.2000
rerank/rerank-ndcg: -0.2000
retrieve/retrieval-precision: -0.2700
retrieve/retrieval-recall: -0.2800

Diagnostics:
[WARNING] degraded record detected for rag.retrieval_precision_drop
[INFO] expected terminal stage package is missing

코드 조각별로 이해하기


1. 기준선과 저하 실행 비교하기


comparison = ctx.compare_runs(BASELINE, DEGRADED)

compare_runs()는 기준선인 rag-v1-baseline과 저하된 rag-v2-degraded의 메트릭 차이를 계산합니다.

메트릭은 단계 별로 저장되어 있으므로, 비교 결과도 retrieve, rerank, generate, evaluate 단계별로 나뉩니다.


2. 단계 별 delta 출력하기


for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")

분석 코드는 각 단계의 메트릭 delta를 출력합니다.

retrieval-precision -0.2700, retrieval-recall -0.2800처럼 retrieve 단계의 하락이 가장 앞단에서 크게 나타납니다.


3. 저하 진단 확인하기


diagnosis = ctx.diagnose_run(DEGRADED)
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")

저하된 실행의 진단 결과에는 rag.retrieval_precision_drop 경고가 포함됩니다.

따라서 단계 별 delta와 진단 결과를 함께 보면, 최종 품질 하락의 시작점을 retrieve 단계로 좁힐 수 있습니다.


최종 답변


Contexta를 통해, Noah는 앞서 제시된 가설에 이런 식으로 답변할 수 있습니다.

  • Q1. retrieve 단계에서 정밀도가 낮은 문서를 반환하고 있을 수 있나요?

    • A1. 네. retrieval-precision-0.2700, retrieval-recall-0.2800 하락했습니다.
      또한, 진단에도 rag.retrieval_precision_drop 경고가 나타납니다.
  • Q2. rerank 모델이 새 쿼리 유형에서 저하됐을 수 있나요?

    • A2. rerank 지표도 하락했습니다. rerank-ndcgrerank-mrr 모두 -0.2000입니다.
      다만, 별도의 경고가 없어 retrieve 저하 이후의 연쇄 영향일 가능성이 큽니다.
  • Q3. generation 모델이 드리프트 됐을 수 있나요?

    • A3. generation 지표도 낮아졌습니다. generation-fluency -0.1200, generation-coherence -0.1700입니다.
      하지만 가장 앞단의 큰 하락은 retrieve에서 시작됩니다.
  • Q4. 측정 노이즈일 수 있나요?

    • A4. Retrieve, rerank, generate, evaluate 전반에 일관된 하락이 있으므로 단순 노이즈로 보기는 어렵습니다.

따라서, Noah는 다음과 같이 답할 수 있습니다.

최종 품질 하락의 시작점은 retrieve 단계입니다.
Rerank와 generation 저하는 그 연쇄 영향일 가능성이 크므로 우선 검색 인덱스, retriever 설정, 후보 문서 품질을 조사하여야 합니다.


선택: 예제 데이터 생성


이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.

아래 데이터 준비 코드는 .contexta 워크스페이스에 기준선 RAG 실행과 저하된 RAG 실행을 생성합니다.


seed_rag_decomposition_data.py
"""Create RAG-stage decomposition records used by the RAG case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "knowledge-base-rag"

RAG_STAGES = ["retrieve", "rerank", "generate", "evaluate"]

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
),
)
)


def _build_rag_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
stage_metrics: dict[str, dict[str, float]],
emit_retrieval_warning: bool = False,
) -> str:
"""Create a RAG pipeline run with per-stage metrics."""
run_ref = f"run:{project_name}.{run_name}"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)

# Build 4 stages with staggered timestamps
date_str = started_at[:10]
stage_time_slots = [
(f"{date_str}T09:00:00Z", f"{date_str}T09:10:00Z"),
(f"{date_str}T09:10:00Z", f"{date_str}T09:20:00Z"),
(f"{date_str}T09:20:00Z", f"{date_str}T09:40:00Z"),
(f"{date_str}T09:40:00Z", f"{date_str}T09:50:00Z"),
]

stage_refs: dict[str, str] = {}
for idx, stage_name in enumerate(RAG_STAGES):
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
slot_start, slot_end = stage_time_slots[idx]
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status="completed",
started_at=slot_start,
ended_at=slot_end,
order_index=idx,
)
)
stage_refs[stage_name] = stage_ref

# Emit metrics per stage
for stage_name, metrics in stage_metrics.items():
stage_ref = stage_refs[stage_name]
ts = stage_time_slots[RAG_STAGES.index(stage_name)][1]
for key, val in metrics.items():
_emit_metric(record_store, project_name, run_name, run_ref, stage_ref, key, val, ts)

# Optionally emit a degradation warning on the retrieve stage
if emit_retrieval_warning:
retrieve_ref = stage_refs["retrieve"]
ts = stage_time_slots[0][1]
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="rag.retrieval_precision_drop",
category="verification",
severity="warning",
summary=(
"Retrieval precision dropped below threshold (0.55 < 0.70). "
"Index may contain stale or out-of-distribution documents. "
"Downstream rerank and generate stages affected."
),
origin_marker="explicit_capture",
attributes={
"retrieval-precision": 0.55,
"threshold": 0.70,
"top-k": 5,
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="rag.retrieval-warning",
level="warning",
message="Retrieval precision below threshold. Cascading quality issues expected.",
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create v1 baseline and v2 degraded RAG runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case10-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Knowledge-base RAG pipeline (4 stages)",
)
)

# v1: balanced quality across all stages
v1_stage_metrics = {
"retrieve": {"retrieval-precision": 0.82, "retrieval-recall": 0.79},
"rerank": {"rerank-ndcg": 0.81, "rerank-mrr": 0.77},
"generate": {"generation-fluency": 0.88, "generation-coherence": 0.85},
"evaluate": {"answer-quality": 0.87, "faithfulness": 0.84},
}
v1_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v1-baseline",
started_at="2025-04-01T09:00:00Z",
ended_at="2025-04-01T09:50:00Z",
stage_metrics=v1_stage_metrics,
emit_retrieval_warning=False,
)

# v2: retrieval precision drops -> cascading degradation downstream
v2_stage_metrics = {
"retrieve": {"retrieval-precision": 0.55, "retrieval-recall": 0.51},
"rerank": {"rerank-ndcg": 0.61, "rerank-mrr": 0.57},
"generate": {"generation-fluency": 0.76, "generation-coherence": 0.68},
"evaluate": {"answer-quality": 0.64, "faithfulness": 0.59},
}
v2_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v2-degraded",
started_at="2025-04-15T09:00:00Z",
ended_at="2025-04-15T09:50:00Z",
stage_metrics=v2_stage_metrics,
emit_retrieval_warning=True,
)

return {
"v1_run_id": v1_ref,
"v2_run_id": v2_ref,
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()

코드를 seed_rag_decomposition_data.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.

uv run seed_rag_decomposition_data.py