본문으로 건너뛰기

Case 07 : 조용한 실패를 찾아봅시다


페르소나


David는 모델 학습에 필요한 피처를 만드는 야간 ETL 배치 작업을 운영하는 데이터 엔지니어입니다.

그가 확인해야 하는 것은 배치 작업의 종료 코드만이 아닙니다.

  • 매일 밤 배치가 어떤 실행과 연결됐나요?
  • 출력 데이터 품질이 기준을 만족했나요?
  • 어느 날부터 품질 저하가 시작됐나요?
  • 모델 성능 하락과 배치 이상을 연결할 수 있나요?

상황


5일 전에 David의 회사에서 사용하는 서비스의 벤더가 API 응답 스키마를 바꿨습니다.

조인은 실패하지 않았지만, purchase_amount 컬럼이 NULL로 채워졌고 이후 0으로 대체됐습니다.

배치 작업은 종료 코드 0으로 완료됐지만, 다운스트림 모델 정확도는 0.934에서 0.871로 떨어졌습니다.


이 케이스의 핵심은 배치 실행마다 데이터 품질 저하 기록을 남기고, 어느 밤의 작업이 문제였는지 빠르게 찾는 것입니다.


Contexta 없이 해결하려면


Contexta 없이 이 문제를 찾으려면, David는 보통 아래 작업을 직접 해야 합니다.

  1. 스케줄러에서 7일치 배치 상태를 확인합니다.
  2. 모두 성공으로 보이면 각 날짜별 로그를 다시 엽니다.
  3. 피처 컬럼별 NULL 비율이나 분포 통계를 별도로 계산합니다.
  4. 어느 날짜의 출력이 학습 성능 하락과 연결되는지 수동으로 대조합니다.
  5. 로그가 롤링됐다면 파이프라인을 재실행해 재현을 시도합니다.

종료 코드 0은 작업이 끝났다는 뜻이지, 출력이 학습에 적합하다는 뜻은 아닙니다.


Contexta로 해결하기


David는 각 야간 배치를 BatchExecution으로 기록하고, 품질 기준을 넘는 이상이 있으면 DegradedRecord를 남깁니다.

batch:batch-night-05
└─ run:etl-night-05
└─ stage:feature-engineering
└─ degraded: etl.feature_column_null_overflow

이 해결 흐름은 세 단계입니다.

단계Contexta API얻는 정보
배치 실행 연결BatchExecution날짜별 배치와 run의 관계
품질 저하 기록DegradedRecord컬럼, NULL 비율, 심각도
실행 진단ctx.diagnose_run(run_ref)어느 실행에 저하 이슈가 있었는지

예제 코드


아래 코드는 데이터 준비 단계에서 만든 7일치 배치 실행을 읽고, 각 실행의 row count와 null rate, 품질 저하 여부를 출력합니다.

analyze_batch_monitoring.py
"""Find degraded nightly batch jobs from previously recorded runs."""

from pathlib import Path

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig


PROJECT_NAME = "nightly-etl-feature-engineering"

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=Path(".contexta")),
)
)

store = ctx.metadata_store
try:
print("Nightly batch summary:")
for run in ctx.list_runs(PROJECT_NAME):
snapshot = ctx.get_run_snapshot(run.run_id)
diagnosis = ctx.diagnose_run(run.run_id)
metrics = {record.key: record.value for record in snapshot.records if record.record_type == "metric"}
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
flag = "DEGRADED" if degraded else "ok"
print(
f"{snapshot.run.name}: rows={int(metrics.get('rows-processed', 0))}, "
f"null_rate={metrics.get('feature-null-rate', 0):.4f}, {flag}"
)
finally:
store.close()

실행하면 다음과 같은 결과를 얻습니다.

Nightly batch summary:
etl-night-01: rows=118200, null_rate=0.0020, ok
etl-night-02: rows=118400, null_rate=0.0020, ok
etl-night-03: rows=118600, null_rate=0.0020, ok
etl-night-04: rows=118800, null_rate=0.0020, ok
etl-night-05: rows=0, null_rate=1.0000, DEGRADED
etl-night-06: rows=119200, null_rate=0.0020, ok
etl-night-07: rows=119400, null_rate=0.0020, ok

코드 조각별로 이해하기


1. 날짜별 실행 순회하기


for run in ctx.list_runs(PROJECT_NAME):

ctx.list_runs(PROJECT_NAME)는 프로젝트에 기록된 7일치 야간 ETL 실행을 가져옵니다.

분석 코드는 각 실행을 순회하면서 어느 날짜의 배치가 정상이고 어느 날짜가 저하됐는지 한 번에 확인합니다.


2. row count와 null rate 읽기


snapshot = ctx.get_run_snapshot(run.run_id)
metrics = {record.key: record.value for record in snapshot.records if record.record_type == "metric"}

스냅샷에서 rows-processedfeature-null-rate 메트릭을 읽습니다.

etl-night-05rows=0, null_rate=1.0000으로 기록되어 다른 날짜와 즉시 구분됩니다.


3. 품질 저하 여부 표시하기


diagnosis = ctx.diagnose_run(run.run_id)
degraded = [issue for issue in diagnosis.issues if issue.code == "degraded_record"]
flag = "DEGRADED" if degraded else "ok"

각 실행을 진단해 degraded_record 이슈가 있으면 DEGRADED로 표시합니다.

이 예제에서는 5번째 밤인 etl-night-05DEGRADED로 표시됩니다.


최종 답변


Contexta를 통해, David는 앞서 제시된 질문에 이런 식으로 답변할 수 있습니다.

  • Q1. 매일 밤 배치가 어떤 실행과 연결됐나요?

    • A1. etl-night-01부터 etl-night-07까지 7개 실행으로 기록되어 있습니다.
  • Q2. 출력 데이터 품질이 기준을 만족했나요?

    • A2. etl-night-05null_rate=1.0000이고 DEGRADED입니다. 나머지는 null_rate=0.0020으로 정상입니다.
  • Q3. 어느 날부터 품질 저하가 시작됐나요?

    • A3. 5번째 밤인 etl-night-05에서 시작됐습니다. 이 실행은 rows=0으로 기록되었습니다.
  • Q4. 모델 성능 하락과 배치 이상을 연결할 수 있나요?

    • A4. 네. 성능 하락을 유발한 후보 배치는 etl-night-05이며, 해당 날짜의 피처 산출물을 우선 격리해야 합니다.

따라서, David는 배치 운영 리뷰에서 다음과 같이 답할 수 있습니다.

7일 중 문제가 생긴 배치는 etl-night-05입니다.
그날 row count가 0이고 null rate가 100%였으므로, 해당 피처 산출물과 그 이후 다운스트림 학습 결과를 재검토해야 합니다.


선택: 예제 데이터 생성


이 섹션은 직접 코드를 실행해 보고 싶은 경우에만 필요합니다.

아래 데이터 준비 코드는 .contexta 워크스페이스에 7일치 야간 배치 실행과 5번째 밤의 피처 품질 저하 기록을 생성합니다.


seed_batch_monitoring_data.py
"""Create batch-monitoring records used by the batch case study."""

from __future__ import annotations

import tempfile
from pathlib import Path
from typing import Any

from contexta import Contexta
from contexta.config import UnifiedConfig, WorkspaceConfig
from contexta.contract import (
BatchExecution,
DegradedPayload,
DegradedRecord,
MetricPayload,
MetricRecord,
Project,
RecordEnvelope,
Run,
StageExecution,
StructuredEventPayload,
StructuredEventRecord,
)


PROJECT_NAME = "nightly-etl-feature-engineering"
STAGE_NAME = "feature-engineering"

_REC_COUNTER = 0


def _next_rid() -> str:
global _REC_COUNTER
_REC_COUNTER += 1
return f"r{_REC_COUNTER:05d}"


def _build_nightly_run(
store: Any,
record_store: Any,
project_name: str,
night: int,
has_schema_issue: bool,
) -> str:
"""Create one nightly ETL run with a single feature-engineering stage and batch."""
run_name = f"etl-night-{night:02d}"
run_ref = f"run:{project_name}.{run_name}"
date_str = f"2025-04-{night:02d}"
run_started = f"{date_str}T01:00:00Z"
run_ended = f"{date_str}T01:45:00Z"
stage_started = run_started
stage_ended = f"{date_str}T01:30:00Z"
batch_started = f"{date_str}T01:05:00Z"
batch_ended = f"{date_str}T01:25:00Z"
obs_ts = stage_ended

store.runs.put_run(
Run(
run_ref=run_ref,
project_ref=f"project:{project_name}",
name=run_name,
status="completed",
started_at=run_started,
ended_at=run_ended,
)
)

stage_ref = f"stage:{project_name}.{run_name}.{STAGE_NAME}"
store.stages.put_stage_execution(
StageExecution(
stage_execution_ref=stage_ref,
run_ref=run_ref,
stage_name=STAGE_NAME,
status="completed",
started_at=stage_started,
ended_at=stage_ended,
order_index=0,
)
)

batch_name = f"batch-night-{night:02d}"
batch_ref = f"batch:{project_name}.{run_name}.{STAGE_NAME}.{batch_name}"
store.batches.put_batch_execution(
BatchExecution(
batch_execution_ref=batch_ref,
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_name=batch_name,
status="completed",
started_at=batch_started,
ended_at=batch_ended,
order_index=0,
)
)

# Normal metrics
rows_processed = 0 if has_schema_issue else 118_000 + night * 200
null_rate = 1.0 if has_schema_issue else 0.002
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key="rows-processed",
value=float(rows_processed),
value_type="float64",
),
)
)
record_store.append(
MetricRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="metric",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=MetricPayload(
metric_key="feature-null-rate",
value=null_rate,
value_type="float64",
),
)
)

if has_schema_issue:
# Vendor changed schema: join returned NULLs for the purchase_amount column.
# Exit code still 0. Emit a DegradedRecord to capture the verification gap.
record_store.append(
DegradedRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="degraded",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="partial",
degradation_marker="capture_gap",
),
payload=DegradedPayload(
issue_key="etl.feature_column_null_overflow",
category="verification",
severity="warning",
summary=(
"Feature column purchase_amount was fully zeroed after "
"vendor schema change dropped the source field. "
"Join succeeded but all values are NULL. Exit code was 0."
),
origin_marker="explicit_capture",
attributes={
"column": "purchase_amount",
"null_rate": 1.0,
"vendor_schema_version": "v2",
"expected_schema_version": "v1",
},
),
)
)
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="etl.batch-completed",
level="warning",
message=(
f"Batch {batch_name} completed. "
"purchase_amount column zeroed (vendor schema mismatch). "
"Exit code: 0."
),
origin_marker="explicit_capture",
),
)
)
else:
record_store.append(
StructuredEventRecord(
envelope=RecordEnvelope(
record_ref=f"record:{project_name}.{run_name}.{_next_rid()}",
record_type="event",
recorded_at=obs_ts,
observed_at=obs_ts,
producer_ref="contexta.case07",
run_ref=run_ref,
stage_execution_ref=stage_ref,
batch_execution_ref=batch_ref,
completeness_marker="complete",
degradation_marker="none",
),
payload=StructuredEventPayload(
event_key="etl.batch-completed",
level="info",
message=f"Batch {batch_name} completed. {rows_processed} rows processed.",
origin_marker="explicit_capture",
),
)
)

return run_ref


def run_example(workspace: Path | str | None = None) -> dict[str, Any]:
"""Simulate 7 nightly ETL runs."""

if workspace is None:
root = Path(tempfile.mkdtemp(prefix="contexta-case07-"))
workspace_path = root / ".contexta"
else:
workspace_path = Path(workspace)

ctx = Contexta(
config=UnifiedConfig(
project_name=PROJECT_NAME,
workspace=WorkspaceConfig(root_path=workspace_path),
)
)

store = ctx.metadata_store
try:
store.projects.put_project(
Project(
project_ref=f"project:{PROJECT_NAME}",
name=PROJECT_NAME,
created_at="2025-04-01T00:00:00Z",
description="Nightly ETL feature-engineering pipeline",
)
)

# Night 5 (April 5) has the schema issue
PROBLEM_NIGHT = 5
night_run_refs: list[tuple[int, str]] = []
for night in range(1, 8):
has_issue = (night == PROBLEM_NIGHT)
ref = _build_nightly_run(
store, ctx.record_store, PROJECT_NAME, night, has_schema_issue=has_issue
)
night_run_refs.append((night, ref))

return {
"nights_simulated": 7,
"problem_night": PROBLEM_NIGHT,
"run_ids": [run_ref for _, run_ref in night_run_refs],
}
finally:
store.close()


def main() -> None:
from contextlib import redirect_stdout
import io

with redirect_stdout(io.StringIO()):
run_example(Path(".contexta"))

print(f"Seeded {PROJECT_NAME} data in .contexta.")


if __name__ == "__main__":
main()

코드를 seed_batch_monitoring_data.py로 저장한 뒤, Contexta가 설치된 환경에서 실행하세요.

uv run seed_batch_monitoring_data.py