-
[Airflow] DAG에서 같은 yaml 파일 중복 사용시 ErrorAirflow 2022. 3. 21. 20:58728x90
1. crontab OFF 상태에서 git_pull_clients Task 실행 시 ⇒ error 발생
- 수동 git fetch —all ⇒ update_clients ⇒ error
- up_for_retry에 걸려있음
- clients.yml은 변경(서비스키 삭제)된 상태
2. crontab ON 상태에서 update_clients Task 실행 시 ⇒ error 발생
- clients.yml update 후 crontab 작동할 때 까지 대기 ⇒ 변경 전 clients.yml pull 땡긴 후 nothing to commit, working tree clean error 발생
⇒ DAG가 돌아가는 중에 혹은 어떤 작업 진행 중 CONF repo에 변경사항이 반영되면 git pull을 요구하기 때문에 error가 발생함.
또한, git pull 명령어를 수행한다고 해도 merge에 대한 Commit message 요구함.
3. git pull 땡기지 않고 DAG 돌리면 → git fetch task fail → (airflow code 변동사항 DAG에 반영 안되었기 때문에)
- 수동으로 git fetch 성공
4. client_info 실행 시 Fail 발생함
def get_client_info(): # global serviceKey serviceKey = "07b" data = open('/clients.yml', encoding="utf-8") parsed_data = yaml.load(data, Loader=yaml.FullLoader) client_info = { "rec_dags" : [], "seg_dags" : [], } for type in parsed_data[serviceKey]: if type == "name": client_info["name"] = parsed_data[serviceKey]["name"] elif type == "purchase" or type == "rfm": if parsed_data[serviceKey][type]['use'] == 'Y': client_info["seg_dags"].append(type) else: if parsed_data[serviceKey][type]['use'] == 'Y': client_info["rec_dags"].append(type) return client_info client_info = get_client_info() print_client_info = BashOperator( task_id="print_client_info", bash_command="echo {}".format(client_info["name"]), queue="keyword_train", dag=dag, ) delete_serviceKey = PythonOperator( task_id="delete_serviceKey", python_callable=delete_from_clients, # bash_command="echo {{ dag_run.conf }}", queue="keyword_train", dag=dag ) update_clients = BashOperator( task_id="update_clients", bash_command="cd /CONF && git add clients.yml && git commit -m 'Delete {}' && git push origin master".format(client_info["name"]), queue="keyword_train", dag=dag, )
- client_info 다음의 print_client_info task를 제외하면 FAIL 발생
⇒ get_client_info() 함수를 실행하는 것과 관련 있어 보임.
Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, CeleryExecutor를 예로 들면, 각 task들이 각기 다른 Worker에서 실행될 수 있으며, Xcom은 이러한 경우 task간 데이터 전달을 가능하게 한다.
clients.yml 파일을 get_client_info()와 delete_from_clients()에서 중복으로 사용하고 있어서 파일 open에 문제가 있음을 의심해봄.
=> 같은 파일을 open하고 close하지 않은 상태로 두어서 오류가 난 것으로 예상.
⇒ clients.yml을 clients1.yml로 복제 후 실행하기
def delete_from_clients(): serviceKey = "07b" with open('/clients.yml', encoding="utf-8") as f: content = f.read() serviceKeys = [x for x in content.split("\\n\\n")] new_clients = [x for x in serviceKeys if serviceKey not in x] with open('/clients.yml', 'w', encoding="utf-8") as outfile: outfile.write('\\n'.join(new_clients)) return def get_client_info(): serviceKey = "07b" data = open('/clients1.yml', encoding="utf-8") parsed_data = yaml.load(data, Loader=yaml.FullLoader) client_info = { "rec_dags" : [], "seg_dags" : [], } for type in parsed_data[serviceKey]: if type == "name": client_info["name"] = parsed_data[serviceKey]["name"] elif type == "purchase" or type == "rfm": if parsed_data[serviceKey][type]['use'] == 'Y': client_info["seg_dags"].append(type) else: if parsed_data[serviceKey][type]['use'] == 'Y': client_info["rec_dags"].append(type) return client_info with DAG(dag_name, default_args=default_args, schedule_interval=None) as dag: client_info = get_client_info() delete_serviceKey = PythonOperator( task_id="delete_serviceKey", python_callable=delete_from_clients, # bash_command="echo {{ dag_run.conf }}", queue="keyword_train", dag=dag ) update_clients = BashOperator( task_id="update_clients", bash_command="cd /CONF && git add clients.yml && git commit -m 'Delete {}' && git push origin master".format(client_info["name"]), queue="keyword_train", dag=dag, delete_serviceKey >> update_clients
data = open('/clients.yml', encoding="utf-8") parsed_data = yaml.load(data, Loader=yaml.FullLoader) data.close()
이 부분에서만 close할 것!
delete_from_clients()의 with open()으로 여는 것은 close하지 않아도 된다!
- https://stackoverflow.com/questions/3770348/how-to-safely-open-close-files-in-python-2-4
- https://stackoverflow.com/questions/21275836/if-youre-opening-a-file-using-the-with-statement-do-you-still-need-to-close
python에서는 with open() 문을 더 선호하므로, with open으로 수정할 것!!
5. clients.yml write 시 띄어쓰기 문제
- clients.yml을 새로 쓸 때 해지 고객사를 제외하고 list에 append 하기 때문에
- 고객사와 고객사 사이에 띄어쓰기 없이 그냥 붙여서 쓰면, 위아래 고객사들은 새로운 clients.yml에 포함되지 않음.
- 줄바꿈 중간에 space bar 없이 해야함!!
- Space bar 들어가면 하나의 고객사로 인식해서 새로운 clients.yml에 반영하지 않음.
- 고객사 중간에 2줄씩 띄어쓰기로 구분해주어야 함.
6. delete_dag type별 dag 생성 오류
for문에서 Task를 바로 생성하는게 아니라 함수 안에 BashOperator Task를 생성하는 코드를 넣어 돌려주어야 함.
def create_delete_dags(dag_type, client_info): delete_dag = BashOperator( task_id="delete_{}_{}".format(dag_type, client_info["name"]), bash_command = "airflow delete_dag -y {}_{}_v2.0".format(dag_type, client_info["name"]), queue="keyword_train", dag=dag ) return delete_dag git_pull_CONF >> copy_clients >> client_return >> delete_serviceKey >> update_clients for dag_type in client_info["rec_dags"]: delete_dag = create_delete_dags(dag_type, client_info) update_clients >> delete_dag
- 이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'Airflow' 카테고리의 다른 글
[Airflow] Airflow 초간단 설치하기(feat. 10분 컷!) (0) 2022.06.13 [Airflow] Airflow A서버에서 B서버로 CLI 명령어 날리기 (0) 2022.04.04 [Airflow] xcom variable return_value 반환 에러 (0) 2022.03.15 [Airflow] Task in the 'queued' state(feat. 에어플로우 중단 현상) (0) 2022.03.09 [Airflow] 에어플로우 worker 중단 현상 해결 Ver.2 (0) 2021.12.12 - 수동 git fetch —all ⇒ update_clients ⇒ error