-
[Airflow] Airflow DAG runs 개수 주기적으로 삭제하기Airflow 2023. 8. 1. 00:06728x90
Airflow 각 DAG의 Runs 기록을 보면 매일 DAG run 개수가 누적되어 쌓이고 있었다.
Airflow.cfg 내부 설정에서 개수 제한 관련 설정이 있는지 찾아봤지만,
딱히 찾지 못해서 자체 삭제 스크립트를 개발하기로 했다.
주기를 설정하여 특정 날짜만큼의 run 개수만 유지하도록 하는 로직으로
매일 배치 스케줄을 걸어 해당 주기 이전의 데이터는 삭제하도록 했다.
이렇게 하면 log 저장에 따른 DB 용량 과부하를 관리할 수 있을 것으로 예상.
1. 필요한 모듈 import
from airflow import DAG, settings from airflow.models import DagRun from airflow.operators.python import PythonOperator from datetime import datetime, timezone, timedelta
2. 31일 이전 dag run을 리스트로 받아 db에서 직접 삭제하기
def delete_old_dag_runs(max_age_days=32): session = settings.Session() # Get days to keep record date_threshold = datetime.now(timezone(timedelta(hours=9))) - timedelta(days=max_age_days) # Get list of dag runs to delete dag_runs_to_delete = session.query(DagRun).filter(DagRun.execution_date <= date_threshold).all() # Delete the old dag runs from the database for dag_run in dag_runs_to_delete: # To check dag_run info from task log print(dag_run) session.delete(dag_run) session.commit() session.close() return len(dag_runs_to_delete)
3. 과거 DAG 삭제하는 task 생성 후 배치 스케줄 등록
DAG_ID = 'delete_old_dag_runs' with DAG( dag_id=DAG_ID, default_args=DEFAULT_ARGS, schedule_interval='0 7 * * *', catchup=False, ) as dag: delete_old_dag_runs = PythonOperator( task_id='delete_old_dag_runs', python_callable=delete_old_dag_runs, provide_context=True, queue='queue' ) delete_old_dag_runs
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein
'Airflow' 카테고리의 다른 글
[Airflow] Airflow db 업그레이드 에러 해결(feat. 2.10.0 버전) (4) 2024.10.29 [Airflow] Airflow dag 추가 에러 현상 (0) 2024.02.24 [Airflow] Airflow-s3 remote logging 설정 방법 (0) 2023.03.31 [Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency) (0) 2023.02.07 [Airflow] DAG skipped 상태에서 멈춰있을 때(feat. queue 지정) (0) 2022.10.21