Airflow
-
[Airflow] Airflow db 업그레이드 에러 해결(feat. 2.10.0 버전)Airflow 2024. 10. 29. 21:35
airflow db upgrade 명령어 실행 시,(airflow 2.7.0 대 버전 부터 airflow db upgrade -> airflow db migrate)[SQL: ALTER TABLE dag_run ADD COLUMN clear_number INTEGER DEFAULT '0' NOT NULL]ERROR: column "clear_number" of relation "dag_run" already exists위와 같은 에러가 발생함db 접속 후 해당 컬럼을 삭제해주고(저는 metadata db로 postgresql을 사용했습니다)> ALTER TABLE dag_run DROP COLUMN IF EXISTS clear_number;다시 airflow db upgrade 명령어를 실행하면,ALT..
-
[Airflow] Airflow dag 추가 에러 현상Airflow 2024. 2. 24. 14:04
Airflow에 새로운 DAG를 추가할 때 db 관련 에러가 발생하여 db 업그레이드 진행 시 에러발생> airflow db upgraderaise sa_exc.PendingRollbackError(sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.NotNullViolation) null value in column "m..
-
[Airflow] Airflow DAG runs 개수 주기적으로 삭제하기Airflow 2023. 8. 1. 00:06
Airflow 각 DAG의 Runs 기록을 보면 매일 DAG run 개수가 누적되어 쌓이고 있었다. Airflow.cfg 내부 설정에서 개수 제한 관련 설정이 있는지 찾아봤지만, 딱히 찾지 못해서 자체 삭제 스크립트를 개발하기로 했다. 주기를 설정하여 특정 날짜만큼의 run 개수만 유지하도록 하는 로직으로 매일 배치 스케줄을 걸어 해당 주기 이전의 데이터는 삭제하도록 했다. 이렇게 하면 log 저장에 따른 DB 용량 과부하를 관리할 수 있을 것으로 예상. 1. 필요한 모듈 import from airflow import DAG, settings from airflow.models import DagRun from airflow.operators.python import PythonOperator fro..
-
[Airflow] Airflow-s3 remote logging 설정 방법Airflow 2023. 3. 31. 22:35
Airlfow DAG 실행 시 발생하는 로그들을 S3에 원격으로 적재하는 방법이 있는데, 이번 글에서는 그 설정 방법에 대해서 얘기해보려고 합니다. 1️⃣ 먼저 아래와 같이 aws-cli 명령어로 s3 connection Id를 생성하여 Airflow에 등록해야 합니다. aws-cli airflow connections add 's3_conn' \ --conn-type 's3' \ --conn-extra '{"region": "ap-northeast-2"}' \ 2️⃣ 이후에 airflow.cfg 파일에도 remote_logging 섹션에 위에서 생성한 connection id와 s3 버킷 정보를 입력합니다. airflow.cfg remote_logging = True remote_log_conn_id ..
-
[Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency)Airflow 2023. 2. 7. 23:50
Airflow DAG 사이의 dependency를 설정하는 방법은 2가지가 있는데 1) TriggerDagRunOperator 2) ExternalTaskSensor 1번 TriggerDagRunOperator의 경우, DAG A가 먼저 실행 된 후 task 종료 후 TriggerDagRunOperator task가 실행되면서 DAG B를 trigger하는 방식이다. 이 경우 DAG A에 task를 설정하고, DAG B의 스케줄 설정 여부와 관계없이 트리거를 할 수 있다. # DAG A Task 1 >> Task 2 >> TriggerDagRunOperator Task (Trigger DAG B Task 3) # DAG B Task 3 2번 ExternalTaskSensor이 경우, DAG B의 Exte..
-
[Airflow] DAG skipped 상태에서 멈춰있을 때(feat. queue 지정)Airflow 2022. 10. 21. 20:49
Airflow DAG 테스트 실행 중 마지막 task에서 skipped 상태에서 멈춰 중단된 상태. 다시 재시도를 위해 해당 task Clear-> Run 하면 Queue에서 멈춰있음. task의 코드를 살펴보니 queue를 할당하는 것을 빼먹은 것 발견함. cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}", ) 아래처럼 queue 이름을 지정해서 할당하면 success로 바뀐다! cluster_remover = EmrTerminateJobFlowOperator( task_id..
-
[Airflow] airflow health status check 자동화 설정Airflow 2022. 8. 28. 19:46
가끔 예고없이 airflow scheduler가 중단되는 현상이 발생할 때, airflow health status 체크를 통해 스케줄러 재가동을 자동화하는 방법을 공유하려고 합니다. airflow 공식 문서에 따르면 cli 명령어로 schduler와 DB, 그리고 worker의 상태를 확인할 수 있습니다. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health Checking Airflow Health Status — Airflow Documentation airflow.apache.org 이 중에서 scheduler와 worker 프로세스를 감지한 후..
-
[Airflow] Airflow DAG skipped state 멈춤 현상 해결Airflow 2022. 8. 21. 23:36
Airflow DAG 실행 시 특정 Task에서 skipped 상태에서 멈춰있음. 해당 테스크 Clear -> Run 하면 Queue에서 멈춰있는 현상. => 해당 테스크 operator 코드 부분에서 worker queue를 빠뜨렸음을 발견함. ⚠️ 기존 코드 cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}", ) ✅ 변경 코드 cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_..