-
Airflow 모니터링 시스템 구축기 Part.1(Feat. exporter 설치)Monitoring System/Exporter 2023. 6. 6. 23:31728x90
Airflow on ECS 파이프라인을 구축한 후,
Airflow 서버와 DB로 사용중인 RDS 서버를 모니터링하기 위한 파이프라인을 아래와 같이 구축했습니다.
Airflow와 RDS 각 서비스에서 매트릭을 생성해서 보내주면 시계열(time series) DB인 Prometheus가 수집 및 저장하고 이를 최종적으로 시계열 데이터 시각화 도구인 Grafana에서 대시보드로 보여주는 로직입니다.
각 서비스와 Prometheus 사이에는 exporter라는 중간 매개체(?) 서비스가 있는데 이는 2가지 이유에서 존재합니다:
1. StatsD는 push 모델 방식으로, Prometheus는 pull 모델 방식으로 작동한다.
따라서 중간에서 StatsD가 push하고 Prometheus가 pull할 수 있는 중간 역할자가 필요하다.
2. Airflow에서 보내는 매트릭은 StatsD 포맷이고, 이걸 Prometheus가 읽어들이기 위해선 중간에 exporter가 변환 역할을 한다.
따라서 Airflow용 StatD-exporter와 RDS용 pg-exporter를 셋팅합니다.StatsD-exporter
1. 먼저 Airflow가 StatsD 매트릭을 보낼 수 있도록 statsd 의존성을 포함한 Airflow 패키지를 설치해 줍니다.
$ pip install 'apache-airflow[statsd]'
해당 명령어로 최신 버전의 Airflow를 StatsD 통합에 필요한 패키지들과 같이 설치합니다.
2. 그리고 Airflow의 config 파일 airflow.cfg에도 StatsD 관련 설정을 추가합니다.
[metrics] statsd_on = True statsd_host = localhost(default value) statsd_port = 9125 statsd_prefix = airflow(default value)
3. 다음으로 StatsD-exporter를 docker로 실행해주기 위해 필요한 파일들을 생성합니다.
1) Dockerfile
FROM prom/statsd-exporter:latest COPY statsd.yaml /opt/statsd.yaml
statsd-expoert란 이름으로 docker image build 하기
# docker image build $ docker build -t statsd-exporter .
2) statsd.yaml
# *prefix : airflow mappings: # === Counters === - match: "*.ti_failures" match_metric_type: counter name: "af_agg_ti_failures" labels: airflow_id: "$1" - match: "*.ti_successes" match_metric_type: counter name: "af_agg_ti_successes" labels: airflow_id: "$1" - match: "*.zombies_killed" match_metric_type: counter name: "af_agg_zombies_killed" labels: airflow_id: "$1" - match: "*.scheduler_heartbeat" match_metric_type: counter name: "af_agg_scheduler_heartbeat" labels: airflow_id: "$1" - match: "*.dag_processing.processes" match_metric_type: counter name: "af_agg_dag_processing_processes" labels: airflow_id: "$1" - match: "*.scheduler.tasks.killed_externally" match_metric_type: counter name: "af_agg_scheduler_tasks_killed_externally" labels: airflow_id: "$1" - match: "*.scheduler.tasks.running" match_metric_type: counter name: "af_agg_scheduler_tasks_running" labels: airflow_id: "$1" - match: "*.scheduler.tasks.starving" match_metric_type: counter name: "af_agg_scheduler_tasks_starving" labels: airflow_id: "$1" - match: "*.scheduler.orphaned_tasks.cleared" match_metric_type: counter name: "af_agg_scheduler_orphaned_tasks_cleared" labels: airflow_id: "$1" - match: "*.scheduler.orphaned_tasks.adopted" match_metric_type: counter name: "af_agg_scheduler_orphaned_tasks_adopted" labels: airflow_id: "$1" - match: "*.scheduler.critical_section_busy" match_metric_type: counter name: "af_agg_scheduler_critical_section_busy" labels: airflow_id: "$1" - match: "*.sla_email_notification_failure" match_metric_type: counter name: "af_agg_sla_email_notification_failure" labels: airflow_id: "$1" - match: "*.ti.start.*.*" match_metric_type: counter name: "af_agg_ti_start" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" - match: "*.ti.finish.*.*.*" match_metric_type: counter name: "af_agg_ti_finish" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" state: "$4" - match: "*.dag.callback_exceptions" match_metric_type: counter name: "af_agg_dag_callback_exceptions" labels: airflow_id: "$1" - match: "*.celery.task_timeout_error" match_metric_type: counter name: "af_agg_celery_task_timeout_error" labels: airflow_id: "$1" # === Gauges === - match: "*.dagbag_size" match_metric_type: gauge name: "af_agg_dagbag_size" labels: airflow_id: "$1" - match: "*.dag_processing.import_errors" match_metric_type: gauge name: "af_agg_dag_processing_import_errors" labels: airflow_id: "$1" - match: "*.dag_processing.total_parse_time" match_metric_type: gauge name: "af_agg_dag_processing_total_parse_time" labels: airflow_id: "$1" - match: "*.dag_processing.last_runtime.*" match_metric_type: gauge name: "af_agg_dag_processing_last_runtime" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dag_processing.last_run.seconds_ago.*" match_metric_type: gauge name: "af_agg_dag_processing_last_run_seconds" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dag_processing.processor_timeouts" match_metric_type: gauge name: "af_agg_dag_processing_processor_timeouts" labels: airflow_id: "$1" - match: "*.executor.open_slots" match_metric_type: gauge name: "af_agg_executor_open_slots" labels: airflow_id: "$1" - match: "*.executor.queued_tasks" match_metric_type: gauge name: "af_agg_executor_queued_tasks" labels: airflow_id: "$1" - match: "*.executor.running_tasks" match_metric_type: gauge name: "af_agg_executor_running_tasks" labels: airflow_id: "$1" - match: "*.pool.open_slots.*" match_metric_type: gauge name: "af_agg_pool_open_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.queued_slots.*" match_metric_type: gauge name: "af_agg_pool_queued_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.running_slots.*" match_metric_type: gauge name: "af_agg_pool_running_slots" labels: airflow_id: "$1" pool_name: "$2" - match: "*.pool.starving_tasks.*" match_metric_type: gauge name: "af_agg_pool_starving_tasks" labels: airflow_id: "$1" pool_name: "$2" - match: "*.smart_sensor_operator.poked_tasks" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_tasks" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.poked_success" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_success" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.poked_exception" match_metric_type: gauge name: "af_agg_smart_sensor_operator_poked_exception" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.exception_failures" match_metric_type: gauge name: "af_agg_smart_sensor_operator_exception_failures" labels: airflow_id: "$1" - match: "*.smart_sensor_operator.infra_failures" match_metric_type: gauge name: "af_agg_smart_sensor_operator_infra_failures" labels: airflow_id: "$1" # === Timers === - match: "*.dagrun.dependency-check.*" match_metric_type: observer name: "af_agg_dagrun_dependency_check" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dag.*.*.duration" match_metric_type: observer name: "af_agg_dag_task_duration" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3" - match: "*.dag_processing.last_duration.*" match_metric_type: observer name: "af_agg_dag_processing_duration" labels: airflow_id: "$1" dag_file: "$2" - match: "*.dagrun.duration.success.*" match_metric_type: observer name: "af_agg_dagrun_duration_success" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dagrun.duration.failed.*" match_metric_type: observer name: "af_agg_dagrun_duration_failed" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dagrun.schedule_delay.*" match_metric_type: observer name: "af_agg_dagrun_schedule_delay" labels: airflow_id: "$1" dag_id: "$2" - match: "*.scheduler.critical_section_duration" match_metric_type: observer name: "af_agg_scheduler_critical_section_duration" labels: airflow_id: "$1" - match: "*.dagrun.*.first_task_scheduling_delay" match_metric_type: observer name: "af_agg_dagrun_first_task_scheduling_delay" labels: airflow_id: "$1" dag_id: "$2" - match: "*.dag.*.*.my_counter" match_metric_type: counter name: "my_custom_task_counter" labels: airflow_id: "$1" dag_id: "$2" task_id: "$3"
4. StatsD-exporter docker 실행
docker run -p 9102:9102 -p 9125:9125 -p 9125:9125/udp {statsd_image} \ --statsd.listen-udp=:9125 --web.listen-address=:9102 \ --log.level=debug \ --statsd.mapping-config=/opt/statsd.yaml
* {statsd_image}에 docker image build한 이름으로 변경하기: statsd-exporter
5. url 접속 후 실시간 airflow 매트릭 확인하기
=> http://localhost:9102/metrics
https://ninano1109.tistory.com/269
pg-exporter
동일한 방식으로 pg-exporter도 설치해줍니다. pg-exporter는 9187포트를 사용합니다.
1. Dockerfile 생성
FROM bitnami/postgres-exporter:latest ENV DATA_SOURCE_NAME "postgresql://postgres:postgres@{rds-Endpoint}:{rds-port:ex)5432}/{DB name}?sslmode=disable"
2. pg-exporter docker 실행
docker run --restart=always -p 9187:9187 pg_exporter
이렇게 exporter 셋팅은 완료했고 이제 이 매트릭들을 Pull방식으로 땡겨갈 Prometheus를 설치합니다.
다음 편 Airflow 모니터링 시스템 구축기 Part.2에서 계속..
'Monitoring System > Exporter' 카테고리의 다른 글
[Airflow] Airflow-StatsD exporter 연동 에러 해결 (0) 2023.05.08