Case 10 : RAG의 어느 파트가 문제일까요?
페르소나
Noah는 회사의 지식 베이스를 사용하여 RAG 파이프라인을 운영하는 AI 엔지니어입니다.
파이프라인은 retrieve -> rerank -> generate -> evaluate 네 단계로 구성됩니다.
최신 버전의 종단 간 answer_quality가 0.87에서 0.64로 떨어졌지만, Noah의 팀은 어느 단계가 원인인지 모릅니다.
상황
팀에는 여러 가설이 있습니다.
- retrieve 단계가 낮은 정밀도의 문서를 가져왔을 수 있습니다.
- rerank 모델이 새 쿼리 유형에서 저하됐을 수 있습니다.
- generation 모델이 바뀌었을 수 있습니다.
- 단순 측정 노이즈일 수 있습니다.
이 케이스의 핵심은 종단 간 점수 하나가 아니라 단계 별 메트릭을 저장해, 성능 저하의 시작점을 찾는 것입니다.
Contexta 없이 해결하려면
Contexta 없이 원인을 찾으려면 보통 아래 작업을 직접 해야 합니다.
- retrieve, rerank, generate 단계에 새 계측 코드를 추가합니다.
- 기준선과 최신 버전을 다시 실행합니다.
- 단계별 메트릭을 별도 파일로 모읍니다.
- 각 가설별로 수치를 비교합니다.
- 어떤 저하가 원인이고 어떤 저하가 연쇄 영향인지 해석합니다.
종단 간 answer_quality=0.64만 있으면 네 가지 가설을 각각 조사해야 합니다.
Contexta로 해결하기
각 단계는 자신에게 맞는 메트릭을 남깁니다.
retrieve retrieval-precision
rerank rerank-ndcg
generate generation-fluency
evaluate answer-quality
이 해결 흐름은 세 단계입니다.
| 단계 | Contexta API | 얻는 정보 |
|---|---|---|
| 스테이지별 기록 | 단계 스코프 MetricRecord | 각 단계의 품질 지표 |
| 실행 비교 | ctx.compare_runs(BASELINE, DEGRADED) | 기준선 대비 단계별 변화량 |
| 실행 진단 | ctx.diagnose_run(DEGRADED) | 저하 기록과 진단 이슈 |
예제 코드
아래 코드는 데이터 준비 단계에서 만든 기준선 RAG 실행과 저하된 RAG 실행을 읽고, 단계 별 메트릭 차이를 비교합니다.
"""Compare previously recorded RAG runs by stage."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "knowledge-base-rag"
BASELINE = f"run:{PROJECT_NAME}.rag-v1-baseline"
DEGRADED = f"run:{PROJECT_NAME}.rag-v2-degraded"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
comparison = ctx.compare_runs(BASELINE, DEGRADED)
print("Stage-by-stage deltas:")
for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
diagnosis = ctx.diagnose_run(DEGRADED)
print("\nDiagnostics:")
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")
finally:
store.close()
실행하면 다음과 같은 결과를 얻습니다.
Stage-by-stage deltas:
evaluate/answer-quality: -0.2300
evaluate/faithfulness: -0.2500
generate/generation-coherence: -0.1700
generate/generation-fluency: -0.1200
rerank/rerank-mrr: -0.2000
rerank/rerank-ndcg: -0.2000
retrieve/retrieval-precision: -0.2700
retrieve/retrieval-recall: -0.2800
Diagnostics:
[WARNING] degraded record detected for rag.retrieval_precision_drop
[INFO] expected terminal stage package is missing
코드 조각별로 이해하기
1. 기준선과 저하 실행 비교하기
comparison = ctx.compare_runs(BASELINE, DEGRADED)
compare_runs()는 기준선인 rag-v1-baseline과 저하된 rag-v2-degraded의 메트릭 차이를 계산합니다.
메트릭은 단계 별로 저장되어 있으므로, 비교 결과도 retrieve, rerank, generate, evaluate 단계별로 나뉩니다.
2. 단계 별 delta 출력하기
for stage in comparison.stage_comparisons:
for delta in sorted(stage.metric_deltas, key=lambda item: item.metric_key):
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
분석 코드는 각 단계의 메트릭 delta를 출력합니다.
retrieval-precision -0.2700, retrieval-recall -0.2800처럼 retrieve 단계의 하락이 가장 앞단에서 크게 나타납니다.
3. 저하 진단 확인하기
diagnosis = ctx.diagnose_run(DEGRADED)
for issue in diagnosis.issues:
print(f"[{issue.severity.upper()}] {issue.summary}")
저하된 실행의 진단 결과에는 rag.retrieval_precision_drop 경고가 포함됩니다.
따라서 단계 별 delta와 진단 결과를 함께 보면, 최종 품질 하락의 시작점을 retrieve 단계로 좁힐 수 있습니다.
최종 답변
Contexta를 통해, Noah는 앞서 제시된 가설에 이런 식으로 답변할 수 있습니다.
-
Q1. retrieve 단계에서 정밀도가 낮은 문서를 반환하고 있을 수 있나요?
- A1. 네.
retrieval-precision은-0.2700,retrieval-recall은-0.2800하락했습니다.
또한, 진단에도rag.retrieval_precision_drop경고가 나타납니다.
- A1. 네.
-
Q2. rerank 모델이 새 쿼리 유형에서 저하됐을 수 있나요?
- A2. rerank 지표도 하락했습니다.
rerank-ndcg와rerank-mrr모두-0.2000입니다.
다만, 별도의 경고가 없어 retrieve 저하 이후의 연쇄 영향일 가능성이 큽니다.
- A2. rerank 지표도 하락했습니다.
-
Q3. generation 모델이 드리프트 됐을 수 있나요?
- A3. generation 지표도 낮아졌습니다.
generation-fluency -0.1200,generation-coherence -0.1700입니다.
하지만 가장 앞단의 큰 하락은 retrieve에서 시작됩니다.
- A3. generation 지표도 낮아졌습니다.
-
Q4. 측정 노이즈일 수 있나요?
- A4. Retrieve, rerank, generate, evaluate 전반에 일관된 하락이 있으므로 단순 노이즈로 보기는 어렵습니다.
따라서, Noah는 다음과 같이 답할 수 있습니다.
최종 품질 하락의 시작점은 retrieve 단계입니다.
Rerank와 generation 저하는 그 연쇄 영향일 가능성이 크므로 우선 검색 인덱스, retriever 설정, 후보 문서 품질을 조사하여야 합니다.
선택: 예제 데이터 생성
이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.
아래 데이터 준비 코드는 .contexta 워크스페이스에 기준선 RAG 실행과 저하된 RAG 실행을 생성합니다.
"""Create RAG-stage decomposition records used by the RAG case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "knowledge-base-rag"
RAG_STAGES = ["retrieve", "rerank", "generate", "evaluate"]
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
),
)
)
def _build_rag_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
stage_metrics: dict[str, dict[str, float]],
emit_retrieval_warning: bool = False,
) -> str:
"""Create a RAG pipeline run with per-stage metrics."""
run_ref = f"run:{project_name}.{run_name}"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)
# Build 4 stages with staggered timestamps
date_str = started_at[:10]
stage_time_slots = [
(f"{date_str}T09:00:00Z", f"{date_str}T09:10:00Z"),
(f"{date_str}T09:10:00Z", f"{date_str}T09:20:00Z"),
(f"{date_str}T09:20:00Z", f"{date_str}T09:40:00Z"),
(f"{date_str}T09:40:00Z", f"{date_str}T09:50:00Z"),
]
stage_refs: dict[str, str] = {}
for idx, stage_name in enumerate(RAG_STAGES):
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
slot_start, slot_end = stage_time_slots[idx]
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status="completed",
started_at=slot_start,
ended_at=slot_end,
order_index=idx,
)
)
stage_refs[stage_name] = stage_ref
# Emit metrics per stage
for stage_name, metrics in stage_metrics.items():
stage_ref = stage_refs[stage_name]
ts = stage_time_slots[RAG_STAGES.index(stage_name)][1]
for key, val in metrics.items():
_emit_metric(record_store, project_name, run_name, run_ref, stage_ref, key, val, ts)
# Optionally emit a degradation warning on the retrieve stage
if emit_retrieval_warning:
retrieve_ref = stage_refs["retrieve"]
ts = stage_time_slots[0][1]
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="rag.retrieval_precision_drop",
category="verification",
severity="warning",
summary=(
"Retrieval precision dropped below threshold (0.55 < 0.70). "
"Index may contain stale or out-of-distribution documents. "
"Downstream rerank and generate stages affected."
),
origin_marker="explicit_capture",
attributes={
"retrieval-precision": 0.55,
"threshold": 0.70,
"top-k": 5,
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case10",
run_ref=run_ref,
stage_execution_ref=retrieve_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="rag.retrieval-warning",
level="warning",
message="Retrieval precision below threshold. Cascading quality issues expected.",
origin_marker="explicit_capture",
),
)
)
return run_ref
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create v1 baseline and v2 degraded RAG runs."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case10-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Knowledge-base RAG pipeline (4 stages)",
)
)
# v1: balanced quality across all stages
v1_stage_metrics = {
"retrieve": {"retrieval-precision": 0.82, "retrieval-recall": 0.79},
"rerank": {"rerank-ndcg": 0.81, "rerank-mrr": 0.77},
"generate": {"generation-fluency": 0.88, "generation-coherence": 0.85},
"evaluate": {"answer-quality": 0.87, "faithfulness": 0.84},
}
v1_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v1-baseline",
started_at="2025-04-01T09:00:00Z",
ended_at="2025-04-01T09:50:00Z",
stage_metrics=v1_stage_metrics,
emit_retrieval_warning=False,
)
# v2: retrieval precision drops -> cascading degradation downstream
v2_stage_metrics = {
"retrieve": {"retrieval-precision": 0.55, "retrieval-recall": 0.51},
"rerank": {"rerank-ndcg": 0.61, "rerank-mrr": 0.57},
"generate": {"generation-fluency": 0.76, "generation-coherence": 0.68},
"evaluate": {"answer-quality": 0.64, "faithfulness": 0.59},
}
v2_ref = _build_rag_run(
store, ctx.record_store, PROJECT_NAME,
run_name="rag-v2-degraded",
started_at="2025-04-15T09:00:00Z",
ended_at="2025-04-15T09:50:00Z",
stage_metrics=v2_stage_metrics,
emit_retrieval_warning=True,
)
return {
"v1_run_id": v1_ref,
"v2_run_id": v2_ref,
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
코드를 seed_rag_decomposition_data.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.
uv run seed_rag_decomposition_data.py