본문으로 건너뛰기

Case 08 : 오염된 데이터를 찾아봅시다


페르소나


Sofia는 구매 의도 예측 모델을 운영하는 머신러닝 엔지니어입니다.

Sofia는 데이터 엔지니어와 함께 벤더 API 변경으로 발생한 데이터 오염이 어떤 학습 실행에 영향을 줬는지 찾아야 합니다.

  • 오염 윈도우 안에서 학습된 실행은 무엇인가요?
  • 오염된 실행 중 가장 좋아 보이는 후보가 있었나요?
  • 깨끗한 기준선과 비교하면 성능이 어떻게 달라졌나요?
  • 승격된 모델을 격리해야 하나요?

상황


Sofia의 회사가 사용하는 프로그램의 벤더가 3주 전 API 응답을 변경했습니다.

필드명은 그대로였지만 purchase_intent_score 값이 [0.0, 1.0] 범위에서 [0.0, 0.1] 범위로 조용히 클램핑됐습니다.

스키마 검증은 통과했고, 오염 윈도우 동안 4개의 학습 실행이 만들어졌습니다.


이 케이스의 핵심은 학습 시점에 데이터 오염 이벤트를 실행에 태깅하고, 나중에 영향받은 실행을 찾아 격리하는 것입니다.


Contexta 없이 해결하려면


Contexta 없이 오염 범위를 찾으려면 보통 아래 작업을 직접 해야 합니다.

  1. 벤더 이슈가 시작되고 끝난 날짜를 확인합니다.
  2. 해당 날짜와 겹치는 모든 학습 실행을 여러 시스템에서 찾습니다.
  3. 각 실행의 데이터 버전과 학습 시간을 수동으로 대조합니다.
  4. 오염 실행 중 배포 후보가 있었는지 확인합니다.
  5. 깨끗한 기준선과 성능 차이를 별도로 계산합니다.

이 과정에서 겉으로 가장 성능이 좋아 보였던 오염 실행이 프로덕션에 올라갔을 가능성을 놓치기 쉽습니다.


Contexta로 해결하기


Sofia의 팀은 오염 윈도우 중 학습된 실행에 구조화된 이벤트를 남깁니다.

run:purchase-intent-model.run-2025-04-08
└─ event:data.contamination-window
message="vendor API contamination window"

이 해결 흐름은 세 단계입니다.

단계Contexta API얻는 정보
오염 이벤트 기록StructuredEventRecord어떤 실행이 오염 윈도우와 겹쳤는지
기준선과 비교ctx.compare_runs(BASELINE, latest)오염 실행의 성능 변화
위험 후보 식별ctx.select_best_run(contaminated, "auc", stage_name="evaluate")오염 실행 중 승격됐을 법한 후보

예제 코드


아래 코드는 데이터 준비 단계에서 만든 깨끗한 기준선과 오염 윈도우 내 실행들을 읽고, 격리 대상 후보를 찾습니다.

analyze_contamination.py
"""Identify contaminated runs from previously recorded training history."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "purchase-intent-model"
BASELINE = f"run:{PROJECT_NAME}.run-2025-03-28"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
contaminated: list[str] = []
print("Runs trained during contamination window:")
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
event_keys = {record.key for record in snapshot.records if record.record_type == "event"}
if "data.contamination-window" in event_keys:
contaminated.append(run.run_id)
print(f" {run.name}")

latest = contaminated[-1]
print("\nClean baseline -> latest contaminated:")
comparison = ctx.compare_runs(BASELINE, latest)
for stage in comparison.stage_comparisons:
for delta in stage.metric_deltas:
if delta.delta is not None:
print(f"{stage.stage_name}/{delta.metric_key}: {delta.delta:+.4f}")

false_best = ctx.select_best_run(contaminated, "auc", stage_name="evaluate", higher_is_better=True)
print(f"\nFalse best to quarantine: {false_best}")
finally:
store.close()

실행하면 다음과 같은 결과를 얻습니다.

Runs trained during contamination window:
run-2025-04-03
run-2025-04-08
run-2025-04-14
run-2025-04-21

Clean baseline -> latest contaminated:
evaluate/accuracy: -0.0560
evaluate/auc: -0.0600
evaluate/f1: -0.0560

False best to quarantine: run:purchase-intent-model.run-2025-04-14

코드 조각별로 이해하기


1. 오염 윈도우 실행 찾기


contaminated: list[str] = []
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
event_keys = {record.key for record in snapshot.records if record.record_type == "event"}
if "data.contamination-window" in event_keys:
contaminated.append(run.run_id)

분석 코드는 각 실행의 스냅샷을 읽고 data.contamination-window 이벤트가 있는 실행만 모읍니다.

이 이벤트는 seed 단계에서 실행에 연결되어 저장됩니다.

나중에 벤더 이슈가 발견되어도 학습 실행과 오염 윈도우를 다시 수동으로 맞춰 보지 않아도 됩니다.


2. 깨끗한 기준선과 비교하기


latest = contaminated[-1]
comparison = ctx.compare_runs(BASELINE, latest)

compare_runs()는 오염 실행이 기준선에서 얼마나 벗어났는지 보여 줍니다.

값이 조금 낮아 자연 변동처럼 보였던 실행도, 오염 이벤트와 함께 보면 신뢰할 수 없는 후보로 분류할 수 있습니다.


3. 오염 후보 중 가장 위험한 실행 찾기


false_best = ctx.select_best_run(contaminated, "auc", stage_name="evaluate", higher_is_better=True)

오염된 실행 중 성능이 가장 좋아 보이는 후보는 오히려 더 위험할 수 있습니다.

Sofia는 이 실행을 승격 후보가 아니라 격리 후보로 다룹니다.


최종 답변


Contexta를 통해, Sofia는 앞서 제시된 질문에 이런 식으로 답변할 수 있습니다.

  • Q1. 오염 윈도우 안에서 학습된 실행은 무엇인가요?

    • A1. run-2025-04-03, run-2025-04-08, run-2025-04-14, run-2025-04-21입니다.
  • Q2. 오염된 실행 중 가장 좋아 보이는 후보가 있었나요?

    • A2. 네. AUC 기준으로는 run:purchase-intent-model.run-2025-04-14가 오염 실행 중 가장 좋아 보이는 후보입니다.
  • Q3. 깨끗한 기준선과 비교하면 성능이 어떻게 달라졌나요?

    • A3. 깨끗한 기준선과 최신 오염 실행을 비교하면 accuracy -0.0560, auc -0.0600, f1 -0.0560입니다.
  • Q4. 승격된 모델을 격리해야 하나요?

    • A4. 네. 오염 윈도우 안에서 학습된 실행은 모두 신뢰하면 안 되며, 특히 승격 후보처럼 보이는 run-2025-04-14도 격리해야 합니다.

따라서, Sofia는 오염 조사에서 다음과 같이 답할 수 있습니다.

오염 윈도우에 포함된 4개 실행은 모두 격리 대상입니다.
그중 run-2025-04-14는 성능상 좋아 보일 수 있지만 오염된 데이터로 학습됐으므로 배포 후보에서 제외해야 합니다.


선택: 예제 데이터 생성


이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.

아래 데이터 준비 코드는 .contexta 워크스페이스에 깨끗한 기준선 실행과 오염 윈도우 내 학습 실행들을 생성합니다.


seed_contamination_data.py
"""Create upstream-contamination records used by the contamination case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "purchase-intent-model"

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _build_run(
store: Any,
record_store: Any,
project_name: str,
run_name: str,
started_at: str,
ended_at: str,
accuracy: float,
auc: float,
f1: float,
contaminated: bool,
) -> str:
"""Register a training run with evaluation metrics and optional contamination event."""
run_ref = f"run:{project_name}.{run_name}"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=started_at,
ended_at=ended_at,
)
)

train_ref = f"stage:{project_name}.{run_name}.train"
eval_ref = f"stage:{project_name}.{run_name}.evaluate"

store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=train_ref,
run_ref=run_ref,
stage_name="train",
status="completed",
started_at=started_at,
ended_at=f"{started_at[:10]}T10:00:00Z",
order_index=0,
)
)
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=eval_ref,
run_ref=run_ref,
stage_name="evaluate",
status="completed",
started_at=f"{started_at[:10]}T10:00:00Z",
ended_at=ended_at,
order_index=1,
)
)

obs_ts = ended_at
for key, val in [("accuracy", accuracy), ("auc", auc), ("f1", f1)]:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case08",
run_ref=run_ref,
stage_execution_ref=eval_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=val,
value_type="float64",
),
)
)

if contaminated:
# Tag the run as being within the contamination window
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=started_at,
observed_at=started_at,
producer_ref="contexta.case08",
run_ref=run_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="data.contamination-window",
level="warning",
message=(
"Run trained during vendor schema contamination window "
"(2025-04-01 to 2025-04-21). Feature purchase_intent_score "
"was clamped to [0.0, 0.1] instead of [0.0, 1.0]."
),
attributes={
"contamination_start": "2025-04-01",
"contamination_end": "2025-04-21",
"affected_feature": "purchase_intent_score",
"vendor_issue": "schema_range_clamp",
},
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Create 5 runs: 1 clean baseline and 4 contaminated runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case08-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-03-01T00:00:00Z",
description="Purchase intent prediction model",
)
)

# 1 clean baseline before contamination
baseline_ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name="run-2025-03-28",
started_at="2025-03-28T09:00:00Z",
ended_at="2025-03-28T12:00:00Z",
accuracy=0.934, auc=0.961, f1=0.928,
contaminated=False,
)

# 4 contaminated runs during the window
contaminated_configs = [
("run-2025-04-03", "2025-04-03T09:00:00Z", "2025-04-03T12:00:00Z", 0.891, 0.912, 0.887),
("run-2025-04-08", "2025-04-08T09:00:00Z", "2025-04-08T12:00:00Z", 0.884, 0.905, 0.879),
("run-2025-04-14", "2025-04-14T09:00:00Z", "2025-04-14T12:00:00Z", 0.897, 0.918, 0.893),
("run-2025-04-21", "2025-04-21T09:00:00Z", "2025-04-21T12:00:00Z", 0.878, 0.901, 0.872),
]
contaminated_refs: list[str] = []
for run_name, started, ended, acc, auc, f1 in contaminated_configs:
ref = _build_run(
store, ctx.record_store, PROJECT_NAME,
run_name=run_name,
started_at=started,
ended_at=ended,
accuracy=acc, auc=auc, f1=f1,
contaminated=True,
)
contaminated_refs.append(ref)

return {
"baseline_run_id": baseline_ref,
"contaminated_run_count": len(contaminated_refs),
"contaminated_run_ids": contaminated_refs,
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()

코드를 seed_contamination_data.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.

uv run seed_contamination_data.py