-
[Airflow] Airflow ExternalTask Sensor 생성하기Airflow 2021. 8. 9. 00:06728x90
배경설명
매일 각 고객사별로 검색어 추천용 데이터를 수집하기 위한 DAG가 돌아가는데
모든 고객사 DAG들이 돌고 난 이후 검색어 추천 모델 배포용 DAG가 자동으로 실행(trigger)될 수 있도록
고객사 데이터 수집 -> 모델 학습 -> 그리고 자동으로 배포까지 가능한 자동화 파이프라인을 리서치해봄.
해결방안
이 문제해결을 위한 요구조건으로는 모든 DAG들이 돌았음을 감지하는 무언가가 필요했고
Airflow에 있는 기능 중 ExternalTaskSensor가 그 역할을 할 수 있다고 판단하여 TEST 진행함.
*유의사항
ExternalTaskSensor는 무조건 해당 dag의 스케줄 시간이 정확히 맞는 run만 sensing 한다.
https://uiandwe.tistory.com/1302
- with open yaml 파일 할 때 한글 인코딩을 적용해 주어야 함
- 계속 up_for_retry와 queue를 반복함.
- timeout failed도 적용되지 않고 멈춰있음
- task_instance.com_pull은 def에서 적용시켜줄 것
- string으로만 return 받을 수 있음
clients = "{{ task_instance.xcom_pull('get_client_names_task', key='return_value') }}" clients = task_instance.xcom_pull(task_ids='get_client_names_task')
name 'task_instance' is not defined
1. task들 생성 안됨 X
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.sensors import ExternalTaskSensor # 함수 생성 def get_client_names(**context): client_list = ["test1", "test2"] return client_list def client_sensor(**context): clients = context['task_instance'].xcom_pull(task_ids='get_client_names_task') tasks = [] for client in clients: client_task = ExternalTaskSensor( task_id="keyword_{}_sensor".format(client), external_dag_id="keyword_{}_v2.0".format(client), external_task_id=None, # wait until DAG is completed execution_date_fn=lambda x: x, mode="reschedule", timeout=7200, # fail after 2Hrs queue="keyword_train", dag=dag, ) tasks.append(client_task) return tasks
# DAG 생성 with DAG(dag_name, default_args=default_args, schedule_interval="55 16 * * *") as dag: get_client_names_task = PythonOperator( task_id="get_client_names_task", python_callable=get_client_names, provide_context=True, queue="keyword_train", dag=dag, ) client_sensor_task = PythonOperator( task_id="client_sensor_task", python_callable=client_sensor, provide_context=True, queue="keyword_train", dag=dag, ) get_client_names_task >> client_sensor_task
tasks를 리스트로 만든 자체를 task dependency에 추가하려고 했는데, task 리스트들이 만들어지지 않음.
2. DAG name 파싱해서 고객사 sensor task 만들기
def get_client_names(**context): dir = r"../G2R12N-DAGS/keyword" files = os.listdir(dir) exclude = [ "keyword_train.py", "keyword_dag_dependency.py", "keyword_dag_dependency_test.py", ] clients = [] for file in files: if "keyword_" in file and file not in exclude: clients.append(file[8:-3]) print(file[8:-3])
3. Clients yaml 파일을 이용한 전체 고객사 tasksensor 생성하기
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.sensors import ExternalTaskSensor from airflow.contrib.hooks.slack_webhook_hook import SlackWebhookHook from datetime import timedelta, datetime import yaml def get_client_names(**context): with open("/data/airflow/G2R12N-DAGS/clients.yml", "r", encoding="utf-8") as f: clients = yaml.load(f) client_list = [] for k, v in clients.items(): if clients[k]["keyword"]["use"] == "Y": client_list.append(clients[k]["name"]) return client_list dag_name = "keyword_dag_dependency" default_args = { "owner": "airflow", # owner name of the DAG "depends_on_past": False, # whether to rely on previous task status "start_date": datetime(2021, 4, 30), # start date of task instance "retries": 1, # retry the task once, if it fails "retry_delay": timedelta(minutes=3), # after waiting for 3 min } with DAG(dag_name, default_args=default_args, schedule_interval="0 6 * * *") as dag: trigger_keyword_train_dag = BashOperator( task_id="trigger_keyword_train_dag", bash_command="airflow trigger_dag keyword_train", queue="keyword_train", dag=dag, ) clients = get_client_names() for client in clients: client_sensor = ExternalTaskSensor( task_id="keyword_{}_sensor".format(client), external_dag_id="keyword_{}_v2.0".format(client), external_task_id=None, # wait until DAG is completed execution_date_fn=lambda x: x, mode="reschedule", timeout=7200, # fail after 2Hrs queue="keyword_train", dag=dag, ) client_sensor >> trigger_keyword_train_dag
- For문으로 돌면서 고객사 각각의 ExternalTaskSensor를 생성하고, keyword_train DAG를 수동으로 실행하는 명령문을 실행하는 task끼리의 TASK dependency 생성하기
ref.
https://it-sunny-333.tistory.com/160
https://stackoverflow.com/questions/41254253/airflow-xcom-keyerror-task-instance
- 이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'Airflow' 카테고리의 다른 글
[Airflow] 에어플로우 worker 중단 현상 해결 (0) 2021.11.19 [Airflow] 에어플로우 DAG Dynamically Generate Serial Tasks (feat. 직렬화 처리) (0) 2021.09.24 [Airflow] 에어플로우 Main, MySQL 서버 IP 주소 변경으로 인한 Airflow woker 실행 문제 해결(Feat. Docker container) (0) 2021.09.07 [Airflow] 에어플로우 execution_date DAG 스케줄러 에러(feat.run_id) (0) 2021.07.08 [Airflow] DAG 작성법(Feat. 오늘 날짜/시간 출력하기) (0) 2021.04.21 - with open yaml 파일 할 때 한글 인코딩을 적용해 주어야 함