ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Airflow 모니터링 시스템 구축기 Part.1(Feat. exporter 설치)
    Monitoring System/Exporter 2023. 6. 6. 23:31
    728x90

     

    Airflow on ECS 파이프라인을 구축한 후,

    Airflow 서버와 DB로 사용중인 RDS 서버를 모니터링하기 위한 파이프라인을 아래와 같이 구축했습니다.

     

     

     

    Airflow와 RDS 각 서비스에서 매트릭을 생성해서 보내주면 시계열(time series) DB인 Prometheus가 수집 및 저장하고 이를 최종적으로 시계열 데이터 시각화 도구인 Grafana에서 대시보드로 보여주는 로직입니다.

     

    각 서비스와 Prometheus 사이에는 exporter라는 중간 매개체(?) 서비스가 있는데 이는 2가지 이유에서 존재합니다:

    1. StatsD는 push 모델 방식으로, Prometheus는 pull 모델 방식으로 작동한다.

    따라서 중간에서 StatsD가 push하고 Prometheus가 pull할 수 있는 중간 역할자가 필요하다.

    2. Airflow에서 보내는 매트릭은 StatsD 포맷이고, 이걸 Prometheus가 읽어들이기 위해선 중간에 exporter가 변환 역할을 한다.

     
    따라서 Airflow용 StatD-exporter와 RDS용 pg-exporter를 셋팅합니다.

    StatsD-exporter

    1. 먼저 Airflow가 StatsD 매트릭을 보낼 수 있도록 statsd 의존성을 포함한 Airflow 패키지를 설치해 줍니다.

    $ pip install 'apache-airflow[statsd]'

    해당 명령어로 최신 버전의 Airflow를 StatsD 통합에 필요한 패키지들과 같이 설치합니다.

     

    2. 그리고 Airflow의 config 파일 airflow.cfg에도 StatsD 관련 설정을 추가합니다.

    [metrics]
    statsd_on = True
    statsd_host = localhost(default value)
    statsd_port = 9125
    statsd_prefix = airflow(default value)
     
     

     

    3. 다음으로 StatsD-exporter를 docker로 실행해주기 위해 필요한 파일들을 생성합니다.

    1) Dockerfile

    FROM prom/statsd-exporter:latest
    
    COPY statsd.yaml /opt/statsd.yaml

     

    statsd-expoert란 이름으로 docker image build 하기

    # docker image build
    $ docker build -t statsd-exporter .

     

    2) statsd.yaml

    # *prefix : airflow
    mappings:
      # === Counters ===
      - match: "*.ti_failures"
        match_metric_type: counter
        name: "af_agg_ti_failures"
        labels:
          airflow_id: "$1"
      - match: "*.ti_successes"
        match_metric_type: counter
        name: "af_agg_ti_successes"
        labels:
          airflow_id: "$1"
      - match: "*.zombies_killed"
        match_metric_type: counter
        name: "af_agg_zombies_killed"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler_heartbeat"
        match_metric_type: counter
        name: "af_agg_scheduler_heartbeat"
        labels:
          airflow_id: "$1"
      - match: "*.dag_processing.processes"
        match_metric_type: counter
        name: "af_agg_dag_processing_processes"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.tasks.killed_externally"
        match_metric_type: counter
        name: "af_agg_scheduler_tasks_killed_externally"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.tasks.running"
        match_metric_type: counter
        name: "af_agg_scheduler_tasks_running"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.tasks.starving"
        match_metric_type: counter
        name: "af_agg_scheduler_tasks_starving"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.orphaned_tasks.cleared"
        match_metric_type: counter
        name: "af_agg_scheduler_orphaned_tasks_cleared"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.orphaned_tasks.adopted"
        match_metric_type: counter
        name: "af_agg_scheduler_orphaned_tasks_adopted"
        labels:
          airflow_id: "$1"
      - match: "*.scheduler.critical_section_busy"
        match_metric_type: counter
        name: "af_agg_scheduler_critical_section_busy"
        labels:
          airflow_id: "$1"
      - match: "*.sla_email_notification_failure"
        match_metric_type: counter
        name: "af_agg_sla_email_notification_failure"
        labels:
          airflow_id: "$1"
      - match: "*.ti.start.*.*"
        match_metric_type: counter
        name: "af_agg_ti_start"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
          task_id: "$3"
      - match: "*.ti.finish.*.*.*"
        match_metric_type: counter
        name: "af_agg_ti_finish"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
          task_id: "$3"
          state: "$4"
      - match: "*.dag.callback_exceptions"
        match_metric_type: counter
        name: "af_agg_dag_callback_exceptions"
        labels:
          airflow_id: "$1"
      - match: "*.celery.task_timeout_error"
        match_metric_type: counter
        name: "af_agg_celery_task_timeout_error"
        labels:
          airflow_id: "$1"
    
      # === Gauges ===
      - match: "*.dagbag_size"
        match_metric_type: gauge
        name: "af_agg_dagbag_size"
        labels:
          airflow_id: "$1"
      - match: "*.dag_processing.import_errors"
        match_metric_type: gauge
        name: "af_agg_dag_processing_import_errors"
        labels:
          airflow_id: "$1"
      - match: "*.dag_processing.total_parse_time"
        match_metric_type: gauge
        name: "af_agg_dag_processing_total_parse_time"
        labels:
          airflow_id: "$1"
      - match: "*.dag_processing.last_runtime.*"
        match_metric_type: gauge
        name: "af_agg_dag_processing_last_runtime"
        labels:
          airflow_id: "$1"
          dag_file: "$2"
      - match: "*.dag_processing.last_run.seconds_ago.*"
        match_metric_type: gauge
        name: "af_agg_dag_processing_last_run_seconds"
        labels:
          airflow_id: "$1"
          dag_file: "$2"
      - match: "*.dag_processing.processor_timeouts"
        match_metric_type: gauge
        name: "af_agg_dag_processing_processor_timeouts"
        labels:
          airflow_id: "$1"
      - match: "*.executor.open_slots"
        match_metric_type: gauge
        name: "af_agg_executor_open_slots"
        labels:
          airflow_id: "$1"
      - match: "*.executor.queued_tasks"
        match_metric_type: gauge
        name: "af_agg_executor_queued_tasks"
        labels:
          airflow_id: "$1"
      - match: "*.executor.running_tasks"
        match_metric_type: gauge
        name: "af_agg_executor_running_tasks"
        labels:
          airflow_id: "$1"
      - match: "*.pool.open_slots.*"
        match_metric_type: gauge
        name: "af_agg_pool_open_slots"
        labels:
          airflow_id: "$1"
          pool_name: "$2"
      - match: "*.pool.queued_slots.*"
        match_metric_type: gauge
        name: "af_agg_pool_queued_slots"
        labels:
          airflow_id: "$1"
          pool_name: "$2"
      - match: "*.pool.running_slots.*"
        match_metric_type: gauge
        name: "af_agg_pool_running_slots"
        labels:
          airflow_id: "$1"
          pool_name: "$2"
      - match: "*.pool.starving_tasks.*"
        match_metric_type: gauge
        name: "af_agg_pool_starving_tasks"
        labels:
          airflow_id: "$1"
          pool_name: "$2"
      - match: "*.smart_sensor_operator.poked_tasks"
        match_metric_type: gauge
        name: "af_agg_smart_sensor_operator_poked_tasks"
        labels:
          airflow_id: "$1"
      - match: "*.smart_sensor_operator.poked_success"
        match_metric_type: gauge
        name: "af_agg_smart_sensor_operator_poked_success"
        labels:
          airflow_id: "$1"
      - match: "*.smart_sensor_operator.poked_exception"
        match_metric_type: gauge
        name: "af_agg_smart_sensor_operator_poked_exception"
        labels:
          airflow_id: "$1"
      - match: "*.smart_sensor_operator.exception_failures"
        match_metric_type: gauge
        name: "af_agg_smart_sensor_operator_exception_failures"
        labels:
          airflow_id: "$1"
      - match: "*.smart_sensor_operator.infra_failures"
        match_metric_type: gauge
        name: "af_agg_smart_sensor_operator_infra_failures"
        labels:
          airflow_id: "$1"
    
      # === Timers ===
      - match: "*.dagrun.dependency-check.*"
        match_metric_type: observer
        name: "af_agg_dagrun_dependency_check"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
      - match: "*.dag.*.*.duration"
        match_metric_type: observer
        name: "af_agg_dag_task_duration"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
          task_id: "$3"
      - match: "*.dag_processing.last_duration.*"
        match_metric_type: observer
        name: "af_agg_dag_processing_duration"
        labels:
          airflow_id: "$1"
          dag_file: "$2"
      - match: "*.dagrun.duration.success.*"
        match_metric_type: observer
        name: "af_agg_dagrun_duration_success"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
      - match: "*.dagrun.duration.failed.*"
        match_metric_type: observer
        name: "af_agg_dagrun_duration_failed"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
      - match: "*.dagrun.schedule_delay.*"
        match_metric_type: observer
        name: "af_agg_dagrun_schedule_delay"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
      - match: "*.scheduler.critical_section_duration"
        match_metric_type: observer
        name: "af_agg_scheduler_critical_section_duration"
        labels:
          airflow_id: "$1"
      - match: "*.dagrun.*.first_task_scheduling_delay"
        match_metric_type: observer
        name: "af_agg_dagrun_first_task_scheduling_delay"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
      - match: "*.dag.*.*.my_counter"
        match_metric_type: counter
        name: "my_custom_task_counter"
        labels:
          airflow_id: "$1"
          dag_id: "$2"
          task_id: "$3"

     

    4. StatsD-exporter docker 실행

    docker run -p 9102:9102 -p 9125:9125 -p 9125:9125/udp {statsd_image} \
    --statsd.listen-udp=:9125 --web.listen-address=:9102 \
    --log.level=debug \
    --statsd.mapping-config=/opt/statsd.yaml

    * {statsd_image}에 docker image build한 이름으로 변경하기: statsd-exporter

     

    5. url 접속 후 실시간 airflow 매트릭 확인하기

    => http://localhost:9102/metrics

     

    https://ninano1109.tistory.com/269

     

    [Airflow] Airflow-StatsD 연동 에러 해결

    airflow metrics를 StatsD에 받아오기 위한 statsd-exporter를 도커로 띄우고 airflow 서버와 연결해주어야 하는데, stastd-exporter metrics UI에서 airflow 메트릭이 확인이 안되는 상황에서의 삽질일기를 공유하고

    ninano1109.tistory.com

     

     

    pg-exporter

    동일한 방식으로 pg-exporter도 설치해줍니다. pg-exporter는 9187포트를 사용합니다.

     

    1. Dockerfile 생성

    FROM bitnami/postgres-exporter:latest 
    ENV DATA_SOURCE_NAME "postgresql://postgres:postgres@{rds-Endpoint}:{rds-port:ex)5432}/{DB name}?sslmode=disable"

     

    2. pg-exporter docker 실행

    docker run --restart=always -p 9187:9187 pg_exporter

     

     

    이렇게 exporter 셋팅은 완료했고 이제 이 매트릭들을 Pull방식으로 땡겨갈 Prometheus를 설치합니다.

     

    다음 편 Airflow 모니터링 시스템 구축기 Part.2에서 계속..

     

    'Monitoring System > Exporter' 카테고리의 다른 글

    [Airflow] Airflow-StatsD exporter 연동 에러 해결  (0) 2023.05.08

    댓글

Designed by Tistory.