본문으로 건너뛰기

외부 벤더와 연결하기

이 문서는 Contexta의 Sink 어댑터를 설명합니다.

Sink 어댑터는 Contexta가 수집한 기록을 외부 Observability 또는 추적 시스템으로 전달할 때 사용합니다.


Contexta는 로컬 워크스페이스를 기본 저장 경로로 사용합니다.

하지만, 필요한 경우 OpenTelemetry나 MLflow 같은 시스템으로 수집 기록을 함께 내보낼 수 있습니다.

이러한 외부 연동은 코어 런타임과 분리되어 제공하는 선택적인 기능입니다.


아키텍처


Contexta는 코어 런타임과 벤더 연동을 명확하게 분리합니다.

코어 패키지인 contract, runtime, capture, store, interpretation은 외부 벤더 라이브러리에 의존하지 않습니다.


벤더 어댑터는 contexta.adapters.* 아래에 위치합니다.

각 어댑터는 Sink 프로토콜을 구현하며, Contexta 생성 시 싱크 목록으로 전달할 수 있습니다.

ctx = Contexta(sinks=[MySink(), AnotherSink()])

레코드가 수집되면 CaptureDispatcher가 등록된 모든 싱크로 해당 레코드를 전달합니다.

따라서 같은 실행에서 생성된 기록을 로컬에 남기면서 동시에 외부 시스템으로 내보내는 구성이 가능합니다.


의존성 정책


벤더 연동을 위한 어댑터가 필요한 패키지가 있을 때, 아직 환경에 준비되어 있지 않다면 어댑터 생성 시점에 DependencyError를 발생시킵니다.

이 방식은 잘못 구성된 Sink가 데이터가 흐르기 전에 명확하게 실패하도록 하기 위한 것입니다.

from contexta.common.errors import DependencyError

try:
sink = OTelSink()
except DependencyError as e:
print(e.code) # "otel_api_not_ready"

StdoutSink


StdoutSink는 Python 표준 라이브러리만 사용하는 Sink입니다.


StdoutSink는 수집된 모든 레코드를 JSON Lines 형식으로 출력합니다.

따라서, 로컬 디버깅이나 CI 로그에서 수집 결과를 빠르게 확인할 때 유용합니다.

from contexta.capture.sinks import StdoutSink

sink = StdoutSink(
name="console", # 디스패처 안에서 사용할 싱크 이름
stream="stdout", # "stdout" 또는 "stderr"
indent=None, # None = 압축 출력, 2 = 들여쓰기 출력
)
ctx = Contexta(sinks=[sink])

StdoutSink는 모든 PayloadFamily 값을 지원합니다. 즉, 수신한 페이로드를 종류와 관계없이 출력합니다.


OTelSink


공식적으로 제공하는 설치법 : uv add 'contexta[otel]'


OTelSink는 Contexta가 수집한 레코드 페이로드를 OpenTelemetry API로 내보냅니다.

from contexta.adapters.otel import OTelSink

sink = OTelSink(
service_name="my-ml-service", # OTel tracer/meter 스코프 이름
tracer_provider=None, # None이면 전역 OTel 프로바이더 사용
meter_provider=None, # None이면 전역 OTel 프로바이더 사용
name="otel",
)
ctx = Contexta(sinks=[sink])

내보내기 대상


Contexta 레코드OTel 개념
TraceSpanRecordSpan (tracer.start_span 사용)
MetricRecordHistogram 관찰값
StructuredEventRecord현재 활성 스팬의 이벤트
DegradedRecord현재 활성 스팬의 이벤트 (contexta.degraded)

span_kind 매핑


Contexta span_kindOTel SpanKind
operation, internalINTERNAL
io, networkCLIENT
processPRODUCER

프로바이더 설정


OTelSink는 익스포터, 샘플러, 리소스를 직접 설정하지 않습니다. 이 설정은 호출자 쪽에서 구성해야 합니다.

만약 tracer_provider=None이면 전역으로 등록된 프로바이더가 사용됩니다.

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))

sink = OTelSink(tracer_provider=provider)

OTelSinkPayloadFamily.RECORD만 지원합니다. 컨텍스트 페이로드는 무시됩니다.


MLflowSink


공식적으로 제공하는 설치법 : uv add 'contexta[mlflow]'

MLflowSink는 Contexta가 수집한 레코드 페이로드를 MLflow Tracking API로 내보냅니다.

from contexta.adapters.mlflow import MLflowSink

sink = MLflowSink(
run_id=None, # None이면 활성 mlflow.start_run() 컨텍스트에 기록
name="mlflow",
)
ctx = Contexta(sinks=[sink])

내보내기 대상


Contexta 레코드MLflow 개념
MetricRecordmlflow.log_metric
StructuredEventRecordmlflow.set_tag (contexta.event.<key>)
DegradedRecordmlflow.set_tag (contexta.degraded.<key>)

(단, TraceSpanRecord는 무시됩니다.)


활성 실행과 명시적 run_id


run_id=None이면 현재 활성 MLflow 런에 기록합니다.

with mlflow.start_run():
sink = MLflowSink()
ctx = Contexta(sinks=[sink])
# 이 컨텍스트 안에서 수집된 기록이 활성 MLflow 런으로 전달됩니다.

run_id를 지정하면 해당 MLflow 런을 명시적으로 대상으로 삼습니다.

sink = MLflowSink(run_id="abc123def456")

MLflowSinkPayloadFamily.RECORD만 지원합니다. 컨텍스트 페이로드는 무시됩니다.


태그 쓰기 동작

짧은 시간 안에 많이 수집되는 고주파 메트릭에서는 변하지 않은 메트릭도 과하게 저장될 수 있어, 이를 방지하는 것도 필요합니다.

MLflowSink는 각 Sink 인스턴스별로 같은 metric_key에 대한 단위 태그와 메트릭 태그를 최대 한 번만 기록합니다.

이때 contexta.run_ref 태그도 한 번만 기록됩니다.


스레드 안전성


StdoutSink, OTelSink, MLflowSink는 기본적으로 스레드 안전함(thread-safe)를 보장하지 못합니다.

여기서 스레드 안전함은 여러 스레드에서 동시에 호출하는 상황에서 내부 상태를 일관되게 유지하는 것을 말합니다.

따라서, 여러 스레드에서 사용할 때는 스레드마다 별도의 싱크 인스턴스를 만들거나 외부에서 접근을 보호하는 작업이 필요합니다.

관련 문서