Case 03 : 어느 단계에 문제가 있을까요?
페르소나
Nina는 야간 ML 파이프라인을 운영하는 MLE 겸 데이터 엔지니어입니다.
그녀의 파이프라인은 ingest -> preprocess -> validate -> train -> evaluate 순서로 실행됩니다.
Nina가 알고 싶은 것은 단순히 잡이 끝났는지가 아닙니다.
- 각 스테이지가 정말 유효한 입력으로 실행됐나요?
- 종료 코드 0인 실행 안에 품질 저하 신호가 남아 있나요?
- 문제가 발생한 스테이지는 어디인가요?
- 나쁜 체크포인트가 승격되기 전에 막을 수 있나요?
상황
스키마 마이그레이션 이후 validate 스테이지가 예외를 발생시키지 않고 빈 DataFrame을 다음 단계로 넘겼습니다.
train 스테이지는 0개 행으로 학습을 끝냈고, evaluate 스테이지도 낮은 정확도를 기록했지만 전체 실행 상태는 completed였습니다.
이 문제가 3일 동안 반복되면서, 프로덕션 승격 대기열에는 나쁜 체크포인트가 쌓였습니다.
이 케이스의 핵심은 성공 상태로 끝난 파이프라인 안에서도 스테이지 수준의 저하 기록을 남기고 진단하는 것입니다.
Contexta 없이 해결하려면
Contexta 없이 같은 문제를 찾으려면, Nina는 보통 아래 작업을 직접 해야 합니다.
- 스케줄러에서 3일치 실행 상태를 확인합니다.
- 모든 실행이
completed인 것을 보고 로그를 따로 열어봅니다. - 롤링된 로그에서
validate단계의 row count를 찾습니다. - 학습 메트릭이 낮아진 시점과 데이터 입력 이상 시점을 수동으로 연결합니다.
- 나쁜 체크포인트가 어디까지 만들어졌는지 다시 추적합니다.
종료 코드와 실행 상태만으로는 “정상 완료”와 “빈 입력으로 완료”를 구분할 수 없습니다.
Contexta로 해결하기
Nina는 validate 스테이지에서 빈 입력을 감지했을 때 DegradedRecord를 남깁니다.
run:nightly-day1 completed
├─ stage:ingest completed
├─ stage:preprocess completed
├─ stage:validate completed degraded=validate.empty_dataframe_passthrough
├─ stage:train completed train.accuracy=0.110
└─ stage:evaluate completed eval.accuracy=0.120
이 해결 흐름은 세 단계입니다.
| 단계 | Contexta API | 얻는 정보 |
|---|---|---|
| 저하 기록 남기기 | DegradedRecord | 종료 코드와 별개로 데이터 품질 이상 기록 |
| 실행 진단하기 | ctx.diagnose_run(run_ref) | 에러/경고 수준 이슈 목록 |
| 스냅샷 확인하기 | ctx.get_run_snapshot(run_ref) | 스테이지 상태, 메트릭, 이벤트 기록 |
예제 코드
아래 코드는 3일간의 야간 실행 기록을 읽고, 각 실행의 단계 별 상태와 진단 결과를 출력합니다.
"""Diagnose previously recorded silent pipeline failures."""
from pathlib import Path
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
PROJECT_NAME = "nightly-ml-pipeline"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
store = ctx.metadata_store
try:
runs = ctx.list_runs(PROJECT_NAME)
for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")
finally:
store.close()
기록된 예제 데이터에 대해 실행하면 다음과 같은 결과를 얻습니다.
nightly-day1: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough
nightly-day2: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough
nightly-day3: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough
모든 스테이지 상태가 completed여도, 진단 결과는 validate 스테이지의 저하 기록을 보여 줍니다.
코드 조각별로 이해하기
1. Contexta 워크스페이스 열기
PROJECT_NAME = "nightly-ml-pipeline"
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)
분석 코드는 현재 디렉터리의 .contexta 워크스페이스를 읽습니다.
2. 저장된 실행 목록 읽기
runs = ctx.list_runs(PROJECT_NAME)
list_runs()는 프로젝트에 저장된 실행 목록을 반환합니다.
이 예제에서는 3일치 야간 실행을 순회하면서 각 실행의 상태와 진단 결과를 확인합니다.
3. 스냅샷과 진단을 함께 보기
for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)
get_run_snapshot()은 실행의 단계 별 상태를 보여 주고, diagnose_run()은 실행에 남은 저하 기록을 진단 이슈로 표면화합니다.
따라서 모든 단계의 결과가 completed였다는 사실과 validate 단계에서 저하 기록이 있었다는 사실을 동시에 설명할 수 있습니다.
4. 저하 기록만 골라 출력하기
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")
이 코드는 진단 결과 중 degraded_record만 골라 출력합니다.
예제 데이터에서는 각 실행의 validate 단계에서 validate.empty_dataframe_passthrough 저하 기록이 발견됩니다.
최종 답변
Contexta를 통해, Nina는 앞서 제시된 질문에 이런 식으로 답변할 수 있습니다.
-
Q1. 각 단계가 정말 유효한 입력으로 실행됐나요?
- A1. 아닙니다. 3일 모두
validate단계에서validate.empty_dataframe_passthrough저하 기록이 발견됐습니다.
- A1. 아닙니다. 3일 모두
-
Q2. 종료 코드 0인 실행 안에 품질 저하 신호가 남아 있나요?
- A2. 네. 세 실행 모두 진단 결과에
degraded_record가 나타났고, 요약은validate.empty_dataframe_passthrough입니다.
- A2. 네. 세 실행 모두 진단 결과에
-
Q3. 문제가 발생한 단계는 어디인가요?
- A3. 문제가 생긴 것은
validate단계입니다.validate가 빈 DataFrame을 downstream으로 넘긴 것이 핵심입니다.
- A3. 문제가 생긴 것은
-
Q4. 나쁜 체크포인트가 승격되기 전에 막을 수 있나요?
- A4. 네. 1일차 실행부터 저하 기록이 남아 있으므로,
diagnose_run()결과를 배포 게이트에 연결하면 체크포인트 승격 전에 막을 수 있습니다.
- A4. 네. 1일차 실행부터 저하 기록이 남아 있으므로,
따라서, Nina는 운영 리뷰에서 다음과 같이 답할 수 있습니다.
파이프라인은 겉으로는 모두 성공했지만,
validate단계가 빈 데이터를 넘긴 기록이 3일 연속 남아 있습니다.
이 체크포인트들은 승격하면 안 되며,validate단계에 row count assertion을 추가해야 합니다.
선택: 예제 데이터 생성
이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.
아래 데이터 준비 코드는 .contexta 워크스페이스에 3일치 야간 파이프라인 실행과 validate 스테이지의 저하 기록을 생성합니다.
"""Create silent-pipeline-failure records used by the pipeline case study."""
from __future__ import annotations
import tempfile
from pathlib import Path
from typing import Any
from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)
PROJECT_NAME = "nightly-ml-pipeline"
PIPELINE_STAGES = ["ingest", "preprocess", "validate", "train", "evaluate"]
_REC_COUNTER = 0
def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"
def _create_stage(
store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_name: str,
status: str,
started_at: str,
ended_at: str,
order_index: int,
) -> str:
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status=status,
started_at=started_at,
ended_at=ended_at,
order_index=order_index,
)
)
return stage_ref
def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
aggregation_scope="stage",
),
)
)
def _emit_degraded(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
ts: str,
row_count: int,
) -> None:
"""Emit a DegradedRecord representing empty-dataframe pass-through."""
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="partial",
degradation_marker="capture_gap", # data never arrived
),
payload=DegradedPayload(
issue_key="validate.empty_dataframe_passthrough",
category="capture",
severity="error",
summary=(
f"Validate stage received {row_count} rows after schema migration. "
"Empty DataFrame passed to train stage without raising an exception."
),
origin_marker="explicit_capture",
attributes={
"row_count": row_count,
"expected_minimum_rows": 10000,
"stage": "validate",
},
),
)
)
def _emit_event(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
event_key: str,
message: str,
level: str,
ts: str,
) -> None:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key=event_key,
level=level,
message=message,
origin_marker="explicit_capture",
),
)
)
def _build_daily_run(
store: Any,
record_store: Any,
project_name: str,
day: int,
silent_failure: bool,
) -> str:
"""Create one nightly run. If silent_failure=True, validate passes empty data."""
run_name = f"nightly-day{day}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-03-{17 + day:02d}"
base = f"{date_str}T02:00:00Z"
store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=base,
ended_at=f"{date_str}T04:30:00Z",
)
)
# Build all 5 stages. Validate "succeeds" superficially even on bad days.
stage_times = [
("ingest", f"{date_str}T02:00:00Z", f"{date_str}T02:20:00Z"),
("preprocess", f"{date_str}T02:20:00Z", f"{date_str}T02:50:00Z"),
("validate", f"{date_str}T02:50:00Z", f"{date_str}T03:00:00Z"),
("train", f"{date_str}T03:00:00Z", f"{date_str}T04:00:00Z"),
("evaluate", f"{date_str}T04:00:00Z", f"{date_str}T04:30:00Z"),
]
stage_refs: dict[str, str] = {}
for idx, (sname, sstart, send) in enumerate(stage_times):
ref = _create_stage(store, project_name, run_name, run_ref, sname, "completed", sstart, send, idx)
stage_refs[sname] = ref
# Ingest: records loaded
ingest_rows = 120_000 if not silent_failure else 120_000 # ingest itself is fine
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["ingest"],
"pipeline.ingest-complete", f"Loaded {ingest_rows} rows from warehouse.",
"info", f"{date_str}T02:19:00Z")
# Preprocess: OK
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["preprocess"],
"pipeline.preprocess-complete", "Feature engineering done. 118,432 valid rows.",
"info", f"{date_str}T02:49:00Z")
# Validate: the silent failure lives here
validate_ts = f"{date_str}T02:59:00Z"
if silent_failure:
# Schema migration broke the join - validate passes 0 rows downstream
_emit_degraded(
record_store, project_name, run_name, run_ref,
stage_refs["validate"], validate_ts, row_count=0,
)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation check passed (0 rows - schema join returned empty).",
"warning", validate_ts)
else:
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation passed. 117,800 rows passed quality checks.",
"info", validate_ts)
# Train: runs but with garbage data on bad days
train_ts = f"{date_str}T03:59:00Z"
train_loss = 2.31 if silent_failure else 0.28 # default weights = random = high loss
train_acc = 0.11 if silent_failure else 0.934
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.loss", train_loss, train_ts)
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.accuracy", train_acc, train_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["train"],
"training.epoch-end", f"Training complete. loss={train_loss:.3f}",
"info", train_ts)
# Evaluate
eval_ts = f"{date_str}T04:29:00Z"
eval_acc = 0.12 if silent_failure else 0.928
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["evaluate"], "eval.accuracy", eval_acc, eval_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["evaluate"],
"evaluation.complete", f"Eval accuracy={eval_acc:.3f}",
"info" if not silent_failure else "warning", eval_ts)
return run_ref
def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 3 days of silent validate failure."""
if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case03-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)
ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)
store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-01-01T00:00:00Z",
description="Nightly ML training pipeline (5 stages)",
)
)
day_run_refs = []
for day in range(1, 4): # Days 1, 2, 3 - all silent failures
ref = _build_daily_run(store, ctx.record_store, PROJECT_NAME, day, silent_failure=True)
day_run_refs.append((day, ref))
return {
"days_simulated": len(day_run_refs),
"run_ids": [run_ref for _, run_ref in day_run_refs],
}
finally:
store.close()
def main() -> None:
from contextlib import redirect_stdout
import io
with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))
print(f"Seeded {PROJECT_NAME} data in .contexta.")
if __name__ == "__main__":
main()
코드를 pipeline_failure_seed.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.
uv run pipeline_failure_seed.py