본문으로 건너뛰기

Case 03 : 어느 단계에 문제가 있을까요?


페르소나


Nina는 야간 ML 파이프라인을 운영하는 MLE 겸 데이터 엔지니어입니다.

그녀의 파이프라인은 ingest -> preprocess -> validate -> train -> evaluate 순서로 실행됩니다.

Nina가 알고 싶은 것은 단순히 잡이 끝났는지가 아닙니다.

  • 각 스테이지가 정말 유효한 입력으로 실행됐나요?
  • 종료 코드 0인 실행 안에 품질 저하 신호가 남아 있나요?
  • 문제가 발생한 스테이지는 어디인가요?
  • 나쁜 체크포인트가 승격되기 전에 막을 수 있나요?

상황


스키마 마이그레이션 이후 validate 스테이지가 예외를 발생시키지 않고 빈 DataFrame을 다음 단계로 넘겼습니다.

train 스테이지는 0개 행으로 학습을 끝냈고, evaluate 스테이지도 낮은 정확도를 기록했지만 전체 실행 상태는 completed였습니다.

이 문제가 3일 동안 반복되면서, 프로덕션 승격 대기열에는 나쁜 체크포인트가 쌓였습니다.


이 케이스의 핵심은 성공 상태로 끝난 파이프라인 안에서도 스테이지 수준의 저하 기록을 남기고 진단하는 것입니다.


Contexta 없이 해결하려면


Contexta 없이 같은 문제를 찾으려면, Nina는 보통 아래 작업을 직접 해야 합니다.

  1. 스케줄러에서 3일치 실행 상태를 확인합니다.
  2. 모든 실행이 completed인 것을 보고 로그를 따로 열어봅니다.
  3. 롤링된 로그에서 validate 단계의 row count를 찾습니다.
  4. 학습 메트릭이 낮아진 시점과 데이터 입력 이상 시점을 수동으로 연결합니다.
  5. 나쁜 체크포인트가 어디까지 만들어졌는지 다시 추적합니다.

종료 코드와 실행 상태만으로는 “정상 완료”와 “빈 입력으로 완료”를 구분할 수 없습니다.


Contexta로 해결하기


Nina는 validate 스테이지에서 빈 입력을 감지했을 때 DegradedRecord를 남깁니다.

run:nightly-day1 completed
├─ stage:ingest completed
├─ stage:preprocess completed
├─ stage:validate completed degraded=validate.empty_dataframe_passthrough
├─ stage:train completed train.accuracy=0.110
└─ stage:evaluate completed eval.accuracy=0.120

이 해결 흐름은 세 단계입니다.

단계Contexta API얻는 정보
저하 기록 남기기DegradedRecord종료 코드와 별개로 데이터 품질 이상 기록
실행 진단하기ctx.diagnose_run(run_ref)에러/경고 수준 이슈 목록
스냅샷 확인하기ctx.get_run_snapshot(run_ref)스테이지 상태, 메트릭, 이벤트 기록

예제 코드


아래 코드는 3일간의 야간 실행 기록을 읽고, 각 실행의 단계 별 상태와 진단 결과를 출력합니다.

pipeline_failure_review.py
"""Diagnose previously recorded silent pipeline failures."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "nightly-ml-pipeline"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
runs = ctx.list_runs(PROJECT_NAME)
for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")
finally:
store.close()

기록된 예제 데이터에 대해 실행하면 다음과 같은 결과를 얻습니다.

nightly-day1: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough
nightly-day2: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough
nightly-day3: ingest=completed, preprocess=completed, validate=completed, train=completed, evaluate=completed
[WARNING] degraded record detected for validate.empty_dataframe_passthrough

모든 스테이지 상태가 completed여도, 진단 결과는 validate 스테이지의 저하 기록을 보여 줍니다.


코드 조각별로 이해하기


1. Contexta 워크스페이스 열기


PROJECT_NAME = "nightly-ml-pipeline"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

분석 코드는 현재 디렉터리의 .contexta 워크스페이스를 읽습니다.


2. 저장된 실행 목록 읽기


runs = ctx.list_runs(PROJECT_NAME)

list_runs()는 프로젝트에 저장된 실행 목록을 반환합니다.

이 예제에서는 3일치 야간 실행을 순회하면서 각 실행의 상태와 진단 결과를 확인합니다.


3. 스냅샷과 진단을 함께 보기


for run in runs:
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
statuses = ", ".join(f"{stage.name}={stage.status}" for stage in snapshot.stages)

get_run_snapshot()은 실행의 단계 별 상태를 보여 주고, diagnose_run()은 실행에 남은 저하 기록을 진단 이슈로 표면화합니다.

따라서 모든 단계의 결과가 completed였다는 사실과 validate 단계에서 저하 기록이 있었다는 사실을 동시에 설명할 수 있습니다.


4. 저하 기록만 골라 출력하기


degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
print(f"{snapshot.run.name}: {statuses}")
for issue in degraded:
print(f" [{issue.severity.upper()}] {issue.summary}")

이 코드는 진단 결과 중 degraded_record만 골라 출력합니다.

예제 데이터에서는 각 실행의 validate 단계에서 validate.empty_dataframe_passthrough 저하 기록이 발견됩니다.


최종 답변


Contexta를 통해, Nina는 앞서 제시된 질문에 이런 식으로 답변할 수 있습니다.

  • Q1. 각 단계가 정말 유효한 입력으로 실행됐나요?

    • A1. 아닙니다. 3일 모두 validate 단계에서 validate.empty_dataframe_passthrough 저하 기록이 발견됐습니다.
  • Q2. 종료 코드 0인 실행 안에 품질 저하 신호가 남아 있나요?

    • A2. 네. 세 실행 모두 진단 결과에 degraded_record가 나타났고, 요약은 validate.empty_dataframe_passthrough입니다.
  • Q3. 문제가 발생한 단계는 어디인가요?

    • A3. 문제가 생긴 것은 validate 단계입니다. validate가 빈 DataFrame을 downstream으로 넘긴 것이 핵심입니다.
  • Q4. 나쁜 체크포인트가 승격되기 전에 막을 수 있나요?

    • A4. 네. 1일차 실행부터 저하 기록이 남아 있으므로, diagnose_run() 결과를 배포 게이트에 연결하면 체크포인트 승격 전에 막을 수 있습니다.

따라서, Nina는 운영 리뷰에서 다음과 같이 답할 수 있습니다.

파이프라인은 겉으로는 모두 성공했지만, validate 단계가 빈 데이터를 넘긴 기록이 3일 연속 남아 있습니다.
이 체크포인트들은 승격하면 안 되며, validate 단계에 row count assertion을 추가해야 합니다.


선택: 예제 데이터 생성


이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.

아래 데이터 준비 코드는 .contexta 워크스페이스에 3일치 야간 파이프라인 실행과 validate 스테이지의 저하 기록을 생성합니다.


pipeline_failure_seed.py
"""Create silent-pipeline-failure records used by the pipeline case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "nightly-ml-pipeline"

PIPELINE_STAGES = ["ingest", "preprocess", "validate", "train", "evaluate"]

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _create_stage(
store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_name: str,
status: str,
started_at: str,
ended_at: str,
order_index: int,
) -> str:
stage_ref = f"stage:{project_name}.{run_name}.{stage_name}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=stage_name,
status=status,
started_at=started_at,
ended_at=ended_at,
order_index=order_index,
)
)
return stage_ref


def _emit_metric(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
key: str,
value: float,
ts: str,
) -> None:
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key=key,
value=value,
value_type="float64",
aggregation_scope="stage",
),
)
)


def _emit_degraded(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
ts: str,
row_count: int,
) -> None:
"""Emit a DegradedRecord representing empty-dataframe pass-through."""
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="partial",
degradation_marker="capture_gap", # data never arrived
),
payload=DegradedPayload(
issue_key="validate.empty_dataframe_passthrough",
category="capture",
severity="error",
summary=(
f"Validate stage received {row_count} rows after schema migration. "
"Empty DataFrame passed to train stage without raising an exception."
),
origin_marker="explicit_capture",
attributes={
"row_count": row_count,
"expected_minimum_rows": 10000,
"stage": "validate",
},
),
)
)


def _emit_event(
record_store: Any,
project_name: str,
run_name: str,
run_ref: str,
stage_ref: str,
event_key: str,
message: str,
level: str,
ts: str,
) -> None:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=ts,
observed_at=ts,
producer_ref="contexta.case03",
run_ref=run_ref,
stage_execution_ref=stage_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key=event_key,
level=level,
message=message,
origin_marker="explicit_capture",
),
)
)


def _build_daily_run(
store: Any,
record_store: Any,
project_name: str,
day: int,
silent_failure: bool,
) -> str:
"""Create one nightly run. If silent_failure=True, validate passes empty data."""
run_name = f"nightly-day{day}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-03-{17 + day:02d}"
base = f"{date_str}T02:00:00Z"

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=base,
ended_at=f"{date_str}T04:30:00Z",
)
)

# Build all 5 stages. Validate "succeeds" superficially even on bad days.
stage_times = [
("ingest", f"{date_str}T02:00:00Z", f"{date_str}T02:20:00Z"),
("preprocess", f"{date_str}T02:20:00Z", f"{date_str}T02:50:00Z"),
("validate", f"{date_str}T02:50:00Z", f"{date_str}T03:00:00Z"),
("train", f"{date_str}T03:00:00Z", f"{date_str}T04:00:00Z"),
("evaluate", f"{date_str}T04:00:00Z", f"{date_str}T04:30:00Z"),
]

stage_refs: dict[str, str] = {}
for idx, (sname, sstart, send) in enumerate(stage_times):
ref = _create_stage(store, project_name, run_name, run_ref, sname, "completed", sstart, send, idx)
stage_refs[sname] = ref

# Ingest: records loaded
ingest_rows = 120_000 if not silent_failure else 120_000 # ingest itself is fine
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["ingest"],
"pipeline.ingest-complete", f"Loaded {ingest_rows} rows from warehouse.",
"info", f"{date_str}T02:19:00Z")

# Preprocess: OK
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["preprocess"],
"pipeline.preprocess-complete", "Feature engineering done. 118,432 valid rows.",
"info", f"{date_str}T02:49:00Z")

# Validate: the silent failure lives here
validate_ts = f"{date_str}T02:59:00Z"
if silent_failure:
# Schema migration broke the join - validate passes 0 rows downstream
_emit_degraded(
record_store, project_name, run_name, run_ref,
stage_refs["validate"], validate_ts, row_count=0,
)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation check passed (0 rows - schema join returned empty).",
"warning", validate_ts)
else:
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["validate"],
"pipeline.validate-complete",
"Validation passed. 117,800 rows passed quality checks.",
"info", validate_ts)

# Train: runs but with garbage data on bad days
train_ts = f"{date_str}T03:59:00Z"
train_loss = 2.31 if silent_failure else 0.28 # default weights = random = high loss
train_acc = 0.11 if silent_failure else 0.934
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.loss", train_loss, train_ts)
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["train"], "train.accuracy", train_acc, train_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["train"],
"training.epoch-end", f"Training complete. loss={train_loss:.3f}",
"info", train_ts)

# Evaluate
eval_ts = f"{date_str}T04:29:00Z"
eval_acc = 0.12 if silent_failure else 0.928
_emit_metric(record_store, project_name, run_name, run_ref,
stage_refs["evaluate"], "eval.accuracy", eval_acc, eval_ts)
_emit_event(record_store, project_name, run_name, run_ref, stage_refs["evaluate"],
"evaluation.complete", f"Eval accuracy={eval_acc:.3f}",
"info" if not silent_failure else "warning", eval_ts)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 3 days of silent validate failure."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case03-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-01-01T00:00:00Z",
description="Nightly ML training pipeline (5 stages)",
)
)

day_run_refs = []
for day in range(1, 4): # Days 1, 2, 3 - all silent failures
ref = _build_daily_run(store, ctx.record_store, PROJECT_NAME, day, silent_failure=True)
day_run_refs.append((day, ref))

return {
"days_simulated": len(day_run_refs),
"run_ids": [run_ref for _, run_ref in day_run_refs],
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()

코드를 pipeline_failure_seed.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.

uv run pipeline_failure_seed.py