-
[Airflow] 에어플로우 DAG Dynamically Generate Serial Tasks (feat. 직렬화 처리)Airflow 2021. 9. 24. 22:58728x90
배경설명
Airflow DAG task들을 생성할 때, 동일한 유형의 task들을 for문으로 처리하여 직렬화하고자 함.
단순 병렬화(Parallel) 처리라면, 아래와 같이 사용할 수 있음.
rfm_clustering_tasks = [] for rfm_module in py_execute_biz_modules["RFM"]: rfm_clustering_task = EmrAddStepsOperator( task_id="rfm_clustering_{}_emr".format(rfm_module), job_flow_id=created_job_flow_id, aws_conn_id="aws_default", queue="main", steps=get_pyspark_execute_rfm(rfm_module), dag=dag, ) rfm_clustering_tasks.append(rfm_clustering_task) execute_biz_starter >> rfm_clustering_tasks >> execute_biz_end
문제상황
BUT, 원하는 형태는 execute_biz_starter ⇒ (data_preprocessing → parameter_tuning → prediction) ⇒ execute_biz_end 와 같은 중간 tasks들의 직렬화(Sequential) 처리임.
해결방안
1. for문을 사용한 Dynamic task generate 방법
rfm_clustering_tasks = [] for rfm_module in range(3): rfm_clustering_tasks.append(EmrAddStepsOperator( task_id="rfm_clustering_{}_emr".format(py_execute_biz_modules["RFM"][rfm_module]), job_flow_id=created_job_flow_id, aws_conn_id="aws_default", queue="main", steps=get_pyspark_execute_rfm(py_execute_biz_modules["RFM"][rfm_module]), dag=dag, ) ) if rfm_module == 0: execute_biz_starter >> rfm_clustering_tasks[rfm_module] else: rfm_clustering_tasks[rfm_module-1] >> rfm_clustering_tasks[rfm_module] rfm_clustering_tasks[-1] >> execute_biz_end
2. 각 task를 갱신해주며 추가하기
previous_task = None for rfm_module in range(3): rfm_clustering_task = EmrAddStepsOperator( task_id="rfm_clustering_{}_emr".format(py_execute_biz_modules["RFM"][rfm_module]), job_flow_id=created_job_flow_id, aws_conn_id="aws_default", queue="main", steps=get_pyspark_execute_rfm(py_execute_biz_modules["RFM"][rfm_module]), dag=dag, ) if previous_task: previous_task >> rfm_clustering_task else: execute_biz_starter >> rfm_clustering_task previous_task = rfm_clustering_task rfm_clustering_task >> execute_biz_end
*추가적인 개념으로, chain을 사용한 방법 = 병렬화 방법
from airflow.utils.helpers import chain rfm_clustering_tasks = [] for rfm_module in range(3): rfm_clustering_tasks.append(EmrAddStepsOperator( task_id="rfm_clustering_{}_emr".format(py_execute_biz_modules["RFM"][rfm_module]), job_flow_id=created_job_flow_id, aws_conn_id="aws_default", queue="main", steps=get_pyspark_execute_rfm(py_execute_biz_modules["RFM"][rfm_module]), dag=dag, ) ) chain(execute_biz_starter, rfm_clustering_tasks, execute_biz_end)
ref.
https://stackoverflow.com/questions/64847484/airflow-dag-task-dependency-in-a-loop
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein
'Airflow' 카테고리의 다른 글
[Airflow] 에어플로우 worker 중단 현상 해결 Ver.2 (0) 2021.12.12 [Airflow] 에어플로우 worker 중단 현상 해결 (0) 2021.11.19 [Airflow] 에어플로우 Main, MySQL 서버 IP 주소 변경으로 인한 Airflow woker 실행 문제 해결(Feat. Docker container) (0) 2021.09.07 [Airflow] Airflow ExternalTask Sensor 생성하기 (0) 2021.08.09 [Airflow] 에어플로우 execution_date DAG 스케줄러 에러(feat.run_id) (0) 2021.07.08