ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency)
    Airflow 2023. 2. 7. 23:50
    728x90

     

    Airflow DAG 사이의 dependency를 설정하는 방법은 2가지가 있는데

    1) TriggerDagRunOperator

    2) ExternalTaskSensor

     

    1번 TriggerDagRunOperator의 경우,

    DAG A가 먼저 실행 된 후 task 종료 후 TriggerDagRunOperator task가 실행되면서 DAG B를 trigger하는 방식이다.

     

    이 경우 DAG A에 task를 설정하고, DAG B의 스케줄 설정 여부와 관계없이 트리거를 할 수 있다.

    # DAG A
    Task 1 >> Task 2 >> TriggerDagRunOperator Task (Trigger DAG B Task 3)
    
    # DAG B
    Task 3

     

    2번 ExternalTaskSensor이 경우,

    DAG B의 ExternalTaskSensor task가 DAG A(parent dag)의 특정 task를 감지하고 있다가 DAG A 특정 task가 완료되면

    DAG B의 이후 task가 실행되는 방식이다.

     

    이 경우 DAG B에 task를 설정하고 DAG A와 B에 동일하게 스케줄을 걸어주어야 한다.

    # DAG_A(schedule_interval='0 1 * * *')
    Task1 >> Task2 
    
    # DAG_B(schedule_interval='0 1 * * *')
    ExternalTaskSensor Task(DAG_A Task2) >> Task3
    (Task1 > Task2 > ETS Task > Task3)

     

     

    이때,  만약 스케줄이 다른 두개의 DAG로 dependency를 걸고 싶다면

    스케줄 시간을 계산하여 맞춰주어야 합니다.

     

    이를 자동으로 구현하기 위해 DAG의 execution date를 가져오는 함수를 만들 수 있습니다.

     

    def get_parent_dag_execution_date(dag):
        # DagRun 로그 중 DAG A를 가져오기
        dag_runs = DagRun.find(dag_id=dag)
        # DAG A의 가장 마지막(최근) 실행 로그 가져오기
        dag_run = dag_runs[-1]
        # DAG A의 가장 마지막 실행 시간 가져오기
        dag_run_execution_date = dag_run.execution_date
    
        return dag_run_execution_date

    DAG의 스케줄이 고정된 시간이라면 그 차이만큼 단순 빼기를 하면 되지만,

    가변적 스케줄 혹은 변동하는 스케줄을 가진 DAG의 시간을 구하기 위해서는

    위의 코드처럼 해당 DAG의 마지막 실행 시간을 직접 가져오는 방식으로 구현하여 두 DAG의 스케줄을 맞춰주었습니다.

     

    # DAG B script
    
    def get_parent_dag_execution_date(dag):
        dag_runs = DagRun.find(dag_id=dag)
        dag_run = dag_runs[-1]
        dag_run_execution_date = dag_run.execution_date
    
        return dag_run_execution_date
    
    with DAG(
        dag_id="DAG_B",
    	...
        schedule_interval='0 1 * * *',
    ) as dag:
    
    
    DAG_A_sensor = ExternalTaskSensor(
                        task_id="DAG_A_sensor",
                        external_dag_id="DAG_A",
                        external_task_id="Task2",
                        execution_date_fn=lambda dt: get_parent_dag_execution_date("DAG_A"),
                        timeout=7200,
                        mode="reschedule",
                    )
                    
    Task3
    
    DAG_A_sensor >> Task3

     

     


     

    당신이 어떤 것을

    할머니에게 설명해주지 못한다면,

    그것은 진정으로 이해한 것이 아니다. 

    - A.Einstein

     

     

    댓글

Designed by Tistory.