-
[Airflow] DAG 작성법(Feat. 오늘 날짜/시간 출력하기)Airflow 2021. 4. 21. 21:35728x90
# Step 1: Importing modules
# airflow DAG 모듈 import from airflow import DAG # 날짜와 시간 출력하기 위한 import from datetime import datetime, timedelta # airflow 실행을 위한 PythonOperator import from airflow.operators.python_operator import PythonOperator
# Step 2: Default Arguments
# dict pass to airflow objects containing meta datas default_args = { # owner name of the DAG 'owner': 'airflow', # whether to rely on previous task status 'depends_on_past': False, # start date of task instance 'start_date': datetime(2021, 3, 31), # email address to get notifications 'email': ['abc@abc.com'], # retry the task once, if it fails 'retries': 1, # after waiting for 10 min 'retry_delay': timedelta(minutes=10), }
# Step 3: Instantiate a DAG
# configure schedule & set DAG settings dag = DAG( dag_id='my_dag', # prior tasks not executed catchup = False, default_args=default_args, # how log DagRun should be up before timing out(failing) # dagrun_timeout = timedelta(minutes=30), # continue to run DAG once per hour schedule_interval = timedelta(hours=1), # schedule_interval = '*/1 * * * *', => 'M H D/M M D/W' # '@daily' = '0 0 * * *' ) def print_time(): now = datetime.now() print('=' * 20) print('현재 시간은 {}입니다.'.format(now)) print('=' * 20) def print_day(): days = ['월', '화', '수', '목', '금', '토', '일'] days_idx = datetime.now().date().weekday() print('=' * 20) print('오늘은 {}요일 입니다.'.format(days[days_idx])) print('=' * 20)
# Step 4: Lay out tasks
t1 = PythonOperator( task_id = 'print_time', python_callable = print_time, dag = dag, ) t2 = PythonOperator( task_id = 'print_day', python_callable = print_day, dag = dag, )
# Step 5: Setting up Dependencies(task orders)
# t2 will depend on t1 t1.set_downstream(t2) # or t1 >> t2
# 전체코드
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein
'Airflow' 카테고리의 다른 글
[Airflow] 에어플로우 worker 중단 현상 해결 (0) 2021.11.19 [Airflow] 에어플로우 DAG Dynamically Generate Serial Tasks (feat. 직렬화 처리) (0) 2021.09.24 [Airflow] 에어플로우 Main, MySQL 서버 IP 주소 변경으로 인한 Airflow woker 실행 문제 해결(Feat. Docker container) (0) 2021.09.07 [Airflow] Airflow ExternalTask Sensor 생성하기 (0) 2021.08.09 [Airflow] 에어플로우 execution_date DAG 스케줄러 에러(feat.run_id) (0) 2021.07.08