-
[Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency)Airflow 2023. 2. 7. 23:50728x90
Airflow DAG 사이의 dependency를 설정하는 방법은 2가지가 있는데
1) TriggerDagRunOperator
2) ExternalTaskSensor
1번 TriggerDagRunOperator의 경우,
DAG A가 먼저 실행 된 후 task 종료 후 TriggerDagRunOperator task가 실행되면서 DAG B를 trigger하는 방식이다.
이 경우 DAG A에 task를 설정하고, DAG B의 스케줄 설정 여부와 관계없이 트리거를 할 수 있다.
# DAG A Task 1 >> Task 2 >> TriggerDagRunOperator Task (Trigger DAG B Task 3) # DAG B Task 3
2번 ExternalTaskSensor이 경우,
DAG B의 ExternalTaskSensor task가 DAG A(parent dag)의 특정 task를 감지하고 있다가 DAG A 특정 task가 완료되면
DAG B의 이후 task가 실행되는 방식이다.
이 경우 DAG B에 task를 설정하고 DAG A와 B에 동일하게 스케줄을 걸어주어야 한다.
# DAG_A(schedule_interval='0 1 * * *') Task1 >> Task2 # DAG_B(schedule_interval='0 1 * * *') ExternalTaskSensor Task(DAG_A Task2) >> Task3 (Task1 > Task2 > ETS Task > Task3)
이때, 만약 스케줄이 다른 두개의 DAG로 dependency를 걸고 싶다면
스케줄 시간을 계산하여 맞춰주어야 합니다.
이를 자동으로 구현하기 위해 DAG의 execution date를 가져오는 함수를 만들 수 있습니다.
def get_parent_dag_execution_date(dag): # DagRun 로그 중 DAG A를 가져오기 dag_runs = DagRun.find(dag_id=dag) # DAG A의 가장 마지막(최근) 실행 로그 가져오기 dag_run = dag_runs[-1] # DAG A의 가장 마지막 실행 시간 가져오기 dag_run_execution_date = dag_run.execution_date return dag_run_execution_date
DAG의 스케줄이 고정된 시간이라면 그 차이만큼 단순 빼기를 하면 되지만,
가변적 스케줄 혹은 변동하는 스케줄을 가진 DAG의 시간을 구하기 위해서는
위의 코드처럼 해당 DAG의 마지막 실행 시간을 직접 가져오는 방식으로 구현하여 두 DAG의 스케줄을 맞춰주었습니다.
# DAG B script def get_parent_dag_execution_date(dag): dag_runs = DagRun.find(dag_id=dag) dag_run = dag_runs[-1] dag_run_execution_date = dag_run.execution_date return dag_run_execution_date with DAG( dag_id="DAG_B", ... schedule_interval='0 1 * * *', ) as dag: DAG_A_sensor = ExternalTaskSensor( task_id="DAG_A_sensor", external_dag_id="DAG_A", external_task_id="Task2", execution_date_fn=lambda dt: get_parent_dag_execution_date("DAG_A"), timeout=7200, mode="reschedule", ) Task3 DAG_A_sensor >> Task3
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein
'Airflow' 카테고리의 다른 글
[Airflow] Airflow DAG runs 개수 주기적으로 삭제하기 (0) 2023.08.01 [Airflow] Airflow-s3 remote logging 설정 방법 (0) 2023.03.31 [Airflow] DAG skipped 상태에서 멈춰있을 때(feat. queue 지정) (0) 2022.10.21 [Airflow] airflow health status check 자동화 설정 (0) 2022.08.28 [Airflow] Airflow DAG skipped state 멈춤 현상 해결 (0) 2022.08.21