-
[Airflow] Airflow dag 추가 에러 현상Airflow 2024. 2. 24. 14:04728x90
Airflow에 새로운 DAG를 추가할 때 db 관련 에러가 발생하여 db 업그레이드 진행 시 에러발생
> airflow db upgrade
raise sa_exc.PendingRollbackError( sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.NotNullViolation) null value in column "max_consecutive_failed_dag_runs" of relation "dag" violates not-null constraint DETAIL: Failing row contains (boaz_analysis_dag, null, t, f, t, 2024-01-11 02:31:28.735741+00, null, null, null, null, /airflow/dags/analysis_dag.py, null, airflow, null, grid, null, Never, external triggers only, 16, 16, f, f, null, null, null, null, null, null, null). [SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, processor_subdir, owners, description, default_view, schedule_interval, timetable_description, max_active_tasks, max_active_runs, has_task_concurrency_limits, has_import_errors, next_dagrun, next_dagrun_data_interval_start, next_dagrun_data_interval_end, next_dagrun_create_after) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_parsed_time)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(processor_subdir)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s, %(timetable_description)s, %(max_active_tasks)s, %(max_active_runs)s, %(has_task_concurrency_limits)s, %(has_import_errors)s, %(next_dagrun)s, %(next_dagrun_data_interval_start)s, %(next_dagrun_data_interval_end)s, %(next_dagrun_create_after)s)] [parameters: ({'dag_id': 'analysis_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_parsed_time': datetime.datetime(2024, 1, 11, 2, 31, 28, 735741, tzinfo=Timezone('UTC')), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/airflow/dags/analysis_dag.py', 'processor_subdir': None, 'owners': 'airflow', 'description': None, 'default_view': 'grid', 'schedule_interval': 'null', 'timetable_description': 'Never, external triggers only', 'max_active_tasks': 16, 'max_active_runs': 16, 'has_task_concurrency_limits': False, 'has_import_errors': False, 'next_dagrun': None, 'next_dagrun_data_interval_start': None, 'next_dagrun_data_interval_end': None, 'next_dagrun_create_after': None}
🔴 원인
metadat DB의 dag 테이블 내 max_consecutive_failed_dag_runs 컬럼이 not null로 설정되어 있는데NOT NULL CONSTRAINT 조건에 violate하여 에러 발생
해당 DAG에 값이 빠져 있는 원인은 2가지 정도로 볼 수 있는데:
1.Missing Field in DAG Definition: The DAG definition might not include the `max_consecutive_failed_dag_runs` field, or it might not be initialized properly when the DAG is created. This field should have a default value, but sometimes during upgrades or custom DAG creation, this might not be set automatically.
2. Database Inconsistency: During migrations or upgrades, the database might have missed setting the default value for the `max_consecutive_failed_dag_runs` field for existing DAGs, leading to this issue during migrations.
1. DAG를 작성했을 때의 DAG Definition이 제대로 작동하지 않았거나,2. DB 업그레이드 시 디폴트 값 설정하는 부분에서 누락이 발생했을 수 있음
🟢 해결방안pg 접속 => \d dag; => ALTER TABLE dag ALTER COLUMN max_consecutive_failed_dag_runs SET DEFAULT 0;
Casae1. dag file name != dag id
1) Dag id 이름만 바꿨을 때, scheduler log:- INFO - Deleted DAG tableau_EC2_management_dag in serialized_dag table - INFO - Deactivated 1 DAGs which are no longer present in file. - INFO - DAG tableau_EC2_management_dag is missing and will be deactivated.
2) airflow dag table 확인이전 EC2 dag_id 의 fileloc 컬럼 = 현재 dag 위치와 동일
'/airflow/dags/A_dag.py'
=> 현재와 다른 새로운 이름으로 바꿔주기UPDATE dag SET fileloc = '/airflow/dags/A.py' WHERE dag_id = 'A';
processor_subdir = NULL
=> 현재 dag_folder 경로로 바꿔주기UPDATE dag SET processor_subdir = '/airflow/dags/ WHERE dag_id = 'A';
airflow scheduler 재가동 이후, scheduler log에 더이상 에러 메세지 나타나지 않음.
이전 삭제된 DAG, tasks들 스케줄 되어 trigger 되는 현상:- metadata DB에 이전 기록들이 남아있는 상태로, 종료되지 않은 상태로 유지되어 있으면
- scheduler 재가동 시 매번 해당 dag 정보를 가져오는 현상
- db clean으로 오래된 데이터를 삭제하기
- airflow db clean --clean-before-timestamp '2024-01-01'
- 이상 오늘의 삽질일기 끄읏🌞
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller'Airflow' 카테고리의 다른 글
[Airflow] Airflow db 업그레이드 에러 해결(feat. 2.10.0 버전) (4) 2024.10.29 [Airflow] Airflow DAG runs 개수 주기적으로 삭제하기 (0) 2023.08.01 [Airflow] Airflow-s3 remote logging 설정 방법 (0) 2023.03.31 [Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency) (0) 2023.02.07 [Airflow] DAG skipped 상태에서 멈춰있을 때(feat. queue 지정) (0) 2022.10.21