ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Airflow dag 추가 에러 현상
    Airflow 2024. 2. 24. 14:04
    728x90

     

    Airflow에 새로운 DAG를 추가할 때 db 관련 에러가 발생하여 db 업그레이드 진행 시 에러발생

    > airflow db upgrade

    raise sa_exc.PendingRollbackError(
    sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.NotNullViolation) null value in column "max_consecutive_failed_dag_runs" of relation "dag" violates not-null constraint
    DETAIL:  Failing row contains (boaz_analysis_dag, null, t, f, t, 2024-01-11 02:31:28.735741+00, null, null, null, null, /airflow/dags/analysis_dag.py, null, airflow, null, grid, null, Never, external triggers only, 16, 16, f, f, null, null, null, null, null, null, null).
    
    [SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_parsed_time, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, processor_subdir, owners, description, default_view, schedule_interval, timetable_description, max_active_tasks, max_active_runs, has_task_concurrency_limits, has_import_errors, next_dagrun, next_dagrun_data_interval_start, next_dagrun_data_interval_end, next_dagrun_create_after) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_parsed_time)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(processor_subdir)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s, %(timetable_description)s, %(max_active_tasks)s, %(max_active_runs)s, %(has_task_concurrency_limits)s, %(has_import_errors)s, %(next_dagrun)s, %(next_dagrun_data_interval_start)s, %(next_dagrun_data_interval_end)s, %(next_dagrun_create_after)s)]
    [parameters: ({'dag_id': 'analysis_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_parsed_time': datetime.datetime(2024, 1, 11, 2, 31, 28, 735741, tzinfo=Timezone('UTC')), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/airflow/dags/analysis_dag.py', 'processor_subdir': None, 'owners': 'airflow', 'description': None, 'default_view': 'grid', 'schedule_interval': 'null', 'timetable_description': 'Never, external triggers only', 'max_active_tasks': 16, 'max_active_runs': 16, 'has_task_concurrency_limits': False, 'has_import_errors': False, 'next_dagrun': None, 'next_dagrun_data_interval_start': None, 'next_dagrun_data_interval_end': None, 'next_dagrun_create_after': None}

     


    🔴 원인

    metadat DB의 dag 테이블 내 max_consecutive_failed_dag_runs 컬럼이 not null로 설정되어 있는데

    NOT NULL CONSTRAINT 조건에 violate하여 에러 발생


    해당 DAG에 값이 빠져 있는 원인은 2가지 정도로 볼 수 있는데:

    1.Missing Field in DAG Definition: The DAG definition might not include the `max_consecutive_failed_dag_runs` field, or it might not be initialized properly when the DAG is created. This field should have a default value, but sometimes during upgrades or custom DAG creation, this might not be set automatically.
    2. Database Inconsistency: During migrations or upgrades, the database might have missed setting the default value for the `max_consecutive_failed_dag_runs` field for existing DAGs, leading to this issue during migrations.


    1. DAG를 작성했을 때의 DAG Definition이 제대로 작동하지 않았거나, 

    2. DB 업그레이드 시 디폴트 값 설정하는 부분에서 누락이 발생했을 수 있음


    🟢 해결방안

    pg 접속
    => \d dag;
    
    => ALTER TABLE dag
       ALTER COLUMN max_consecutive_failed_dag_runs SET DEFAULT 0;



    Casae1. dag file name != dag id


    1) Dag id 이름만 바꿨을 때, scheduler log:

        - INFO - Deleted DAG tableau_EC2_management_dag in serialized_dag table 
        - INFO - Deactivated 1 DAGs which are no longer present in file. 
        - INFO - DAG tableau_EC2_management_dag is missing and will be deactivated.


      2) airflow dag table 확인

     이전 EC2 dag_id 의 fileloc 컬럼 = 현재 dag 위치와 동일

    '/airflow/dags/A_dag.py'


     => 현재와 다른 새로운 이름으로 바꿔주기

      UPDATE dag
      SET fileloc = '/airflow/dags/A.py'
      WHERE dag_id = 'A';


    processor_subdir = NULL
    => 현재 dag_folder 경로로 바꿔주기

      UPDATE dag
      SET processor_subdir = '/airflow/dags/
      WHERE dag_id = 'A';

     


    airflow scheduler 재가동 이후, scheduler log에 더이상 에러 메세지 나타나지 않음.



    이전 삭제된 DAG, tasks들 스케줄 되어 trigger 되는 현상:

    • metadata DB에 이전 기록들이 남아있는 상태로, 종료되지 않은 상태로 유지되어 있으면
    • scheduler 재가동 시 매번 해당 dag 정보를 가져오는 현상
    • db clean으로 오래된 데이터를 삭제하기
    • airflow db clean --clean-before-timestamp '2024-01-01'

     

     

    - 이상 오늘의 삽질일기 끄읏🌞


    여기저기 삽질도 해보고
    날려도 먹으면서
    배우는 게
    결국 남는거다
    - Z.Sabziller

     

     

     

    '쫄보의삽질' 블로그 탄생스토리가 궁금하다면:

     

    Git push 취소 방법 (feat. '쫄보의삽질' 블로그 탄생 배경)

    아래는 저의 생생한 경험담을 바탕으로 작성한 것 입니다. Github 관리 폴더의 이름을 실수로 변경하고 삭제해버렸다. 작업 후 commit 하려고 아무리 찾아봐도 폴더가 보이지 않았다. 나의 피땀눈물

    ninano1109.tistory.com

     

    댓글

Designed by Tistory.