Case 08 : 오염된 데이터를 찾아봅시다
페르소나
Sofia는 구매 의도 예측 모델을 운영하는 머신러닝 엔지니어입니다.
Sofia는 데이터 엔지니어와 함께 벤더 API 변경으로 발생한 데이터 오염이 어떤 학습 실행에 영향을 줬는지 찾아야 합니다.
- 오염 윈도우 안에서 학습된 실행은 무엇인가요?
- 오염된 실행 중 가장 좋아 보이는 후보가 있었나요?
- 깨끗한 기준선과 비교하면 성능이 어떻게 달라졌나요?
- 승격된 모델을 격리해야 하나요?
상황
Sofia의 회사가 사용하는 프로그램의 벤더가 3주 전 API 응답을 변경했습니다.
필드명은 그대로였지만 purchase_intent_score 값이 [0.0, 1.0] 범위에서 [0.0, 0.1] 범위로 조용히 클램핑됐습니다.
스키마 검증은 통과했고, 오염 윈도우 동안 4개의 학습 실행이 만들어졌습니다.
이 케이스의 핵심은 학습 시점에 데이터 오염 이벤트를 실행에 태깅하고, 나중에 영향받은 실행을 찾아 격리하는 것입니다.
Contexta 없이 해결하려면
Contexta 없이 오염 범위를 찾으려면 보통 아래 작업을 직접 해야 합니다.
- 벤더 이슈가 시작되고 끝난 날짜를 확인합니다.
- 해당 날짜와 겹치는 모든 학습 실행을 여러 시스템에서 찾습니다.
- 각 실행의 데이터 버전과 학습 시간을 수동으로 대조합니다.
- 오염 실행 중 배포 후보가 있었는지 확인합니다.
- 깨끗한 기준선과 성능 차이를 별도로 계산합니다.
이 과정에서 겉으로 가장 성능이 좋아 보였던 오염 실행이 프로덕션에 올라갔을 가능성을 놓치기 쉽습니다.
Contexta로 해결하기
Sofia의 팀은 오염 윈도우 중 학습된 실행에 구조화된 이벤트를 남깁니다.
run:purchase-intent-model.run-2025-04-08
└─ event:data.contamination-window
message="vendor API contamination window"
이 해결 흐름은 세 단계입니다.
| 단계 | Contexta API | 얻는 정보 |
|---|---|---|
| 오염 이벤트 기록 | StructuredEventRecord | 어떤 실행이 오염 윈도우와 겹쳤는지 |
| 기준선과 비교 | ctx.compare_runs(BASELINE, latest) | 오염 실행의 성능 변화 |
| 위험 후보 식별 | ctx.select_best_run(contaminated, "auc", stage_name="evaluate") | 오염 실행 중 승격됐을 법한 후보 |
예제 코드
아래 코드는 데이터 준비 단계에서 만든 깨끗한 기준선과 오염 윈도우 내 실행들을 읽고, 격리 대상 후보를 찾습니다.
"""Identify contaminated runs from previously recorded training history."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "purchase-intent-model"
BASELINE = f"run:{PROJECT_NAME}.run-2025-03-28"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
contaminated: list[str] = []
print("Runs trained during contamination window:")
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
event_keys = {record.key for record in snapshot.records if record.record_type == "event"}
if "data.contamination-window" in event_keys:
contaminated.append(run.run_id)
print(f" {run.name}")
latest = contaminated[-1]
print("\nClean baseline -> latest contaminated:")
comparison = ctx.compare_runs(BASELINE, latest)
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")
false_best = ctx.select_best_run(contaminated, "auc", stage_name="evaluate", higher_is_better=True)
print(f"\nFalse best to quarantine: {false_best}")
finally:
store.close()
실행하면 다음과 같은 결과를 얻습니다.
Runs trained during contamination window:
run-2025-04-03
run-2025-04-08
run-2025-04-14
run-2025-04-21
Clean baseline -> latest contaminated:
evaluate/accuracy: -0.0560
evaluate/auc: -0.0600
evaluate/f1: -0.0560
False best to quarantine: run:purchase-intent-model.run-2025-04-14
코드 조각별로 이해하기
1. 오염 윈도우 실행 찾기
contaminated: list[str] = []
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
event_keys = {record.key for record in snapshot.records if record.record_type == "event"}
if "data.contamination-window" in event_keys:
contaminated.append(run.run_id)
분석 코드는 각 실행의 스냅샷을 읽고 data.contamination-window 이벤트가 있는 실행만 모읍니다.
이 이벤트는 seed 단계에서 실행에 연결되어 저장됩니다.
나중에 벤더 이슈가 발견되어도 학습 실행과 오염 윈도우를 다시 수동으로 맞춰 보지 않아도 됩니다.
2. 깨끗한 기준선과 비교하기
latest = contaminated[-1]
comparison = ctx.compare_runs(BASELINE, latest)
compare_runs()는 오염 실행이 기준선에서 얼마나 벗어났는지 보여 줍니다.
값이 조금 낮아 자연 변동처럼 보였던 실행도, 오염 이벤트와 함께 보면 신뢰할 수 없는 후보로 분류할 수 있습니다.
3. 오염 후보 중 가장 위험한 실행 찾기
false_best = ctx.select_best_run(contaminated, "auc", stage_name="evaluate", higher_is_better=True)
오염된 실행 중 성능이 가장 좋아 보이는 후보는 오히려 더 위험할 수 있습니다.
Sofia는 이 실행을 승격 후보가 아니라 격리 후보로 다룹니다.
최종 답변
Contexta를 통해, Sofia는 앞서 제시된 질문에 이런 식으로 답변할 수 있습니다.
-
Q1. 오염 윈도우 안에서 학습된 실행은 무엇인가요?
- A1.
run-2025-04-03,run-2025-04-08,run-2025-04-14,run-2025-04-21입니다.
- A1.
-
Q2. 오염된 실행 중 가장 좋아 보이는 후보가 있었나요?
- A2. 네. AUC 기준으로는
run:purchase-intent-model.run-2025-04-14가 오염 실행 중 가장 좋아 보이는 후보입니다.
- A2. 네. AUC 기준으로는
-
Q3. 깨끗한 기준선과 비교하면 성능이 어떻게 달라졌나요?
- A3. 깨끗한 기준선과 최신 오염 실행을 비교하면
accuracy -0.0560,auc -0.0600,f1 -0.0560입니다.
- A3. 깨끗한 기준선과 최신 오염 실행을 비교하면
-
Q4. 승격된 모델을 격리해야 하나요?
- A4. 네. 오염 윈도우 안에서 학습된 실행은 모두 신뢰하면 안 되며, 특히 승격 후보처럼 보이는
run-2025-04-14도 격리해야 합니다.
- A4. 네. 오염 윈도우 안에서 학습된 실행은 모두 신뢰하면 안 되며, 특히 승격 후보처럼 보이는
따라서, Sofia는 오염 조사에서 다음과 같이 답할 수 있습니다.
오염 윈도우에 포함된 4개 실행은 모두 격리 대상입니다.
그중run-2025-04-14는 성능상 좋아 보일 수 있지만 오염된 데이터로 학습됐으므로 배포 후보에서 제외해야 합니다.
선택: 예제 데이터 생성
이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.
아래 데이터 준비 코드는 .contexta 워크스페이스에 깨끗한 기준선 실행과 오염 윈도우 내 학습 실행들을 생성합니다.
"""Create upstream-contamination records used by the contamination case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "purchase-intent-model"
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _build_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
accuracy: float,
auc: float,
f1: float,
contaminated: bool,
) -> str:
"""Register a training run with evaluation metrics and optional contamination event."""
run_ref = f"run:{project_name}.{run_name}"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)
train_ref = f"stage:{project_name}.{run_name}.train"
eval_ref = f"stage:{project_name}.{run_name}.evaluate"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=train_ref,
run_ref=run_ref,
stage_name="train",
status="completed",
started_at=started_at,
ended_at=f"{started_at[:10]}T10:00:00Z",
order_index=0,
)
)
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=eval_ref,
run_ref=run_ref,
stage_name="evaluate",
status="completed",
started_at=f"{started_at[:10]}T10:00:00Z",
ended_at=ended_at,
order_index=1,
)
)
obs_ts = ended_at
for key, val in [("accuracy", accuracy), ("auc", auc), ("f1", f1)]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case08",
run_ref=run_ref,
stage_execution_ref=eval_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=val,
value_type="float64",
),
)
)
if contaminated:
# Tag the run as being within the contamination window
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=started_at,
observed_at=started_at,
producer_ref="contexta.case08",
run_ref=run_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="data.contamination-window",
level="warning",
message=(
"Run trained during vendor schema contamination window "
"(2025-04-01 to 2025-04-21). Feature purchase_intent_score "
"was clamped to [0.0, 0.1] instead of [0.0, 1.0]."
),
attributes={
"contamination_start": "2025-04-01",
"contamination_end": "2025-04-21",
"affected_feature": "purchase_intent_score",
"vendor_issue": "schema_range_clamp",
},
origin_marker="explicit_capture",
),
)
)
return run_ref
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create 5 runs: 1 clean baseline and 4 contaminated runs."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case08-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-03-01T00:00:00Z",
description="Purchase intent prediction model",
)
)
# 1 clean baseline before contamination
baseline_ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name="run-2025-03-28",
started_at="2025-03-28T09:00:00Z",
ended_at="2025-03-28T12:00:00Z",
accuracy=0.934, auc=0.961, f1=0.928,
contaminated=False,
)
# 4 contaminated runs during the window
contaminated_configs = [
("run-2025-04-03", "2025-04-03T09:00:00Z", "2025-04-03T12:00:00Z", 0.891, 0.912, 0.887),
("run-2025-04-08", "2025-04-08T09:00:00Z", "2025-04-08T12:00:00Z", 0.884, 0.905, 0.879),
("run-2025-04-14", "2025-04-14T09:00:00Z", "2025-04-14T12:00:00Z", 0.897, 0.918, 0.893),
("run-2025-04-21", "2025-04-21T09:00:00Z", "2025-04-21T12:00:00Z", 0.878, 0.901, 0.872),
]
contaminated_refs: list[str] = []
for run_name, started, ended, acc, auc, f1 in contaminated_configs:
ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name=run_name,
started_at=started,
ended_at=ended,
accuracy=acc, auc=auc, f1=f1,
contaminated=True,
)
contaminated_refs.append(ref)
return {
"baseline_run_id": baseline_ref,
"contaminated_run_count": len(contaminated_refs),
"contaminated_run_ids": contaminated_refs,
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
코드를 seed_contamination_data.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.
uv run seed_contamination_data.py