ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] DAG에서 같은 yaml 파일 중복 사용시 Error
    Airflow 2022. 3. 21. 20:58
    728x90

    1. crontab OFF 상태에서 git_pull_clients Task 실행 시 ⇒ error 발생

    • 수동 git fetch —all ⇒ update_clients ⇒ error
      • up_for_retry에 걸려있음
      • clients.yml은 변경(서비스키 삭제)된 상태

     

    2. crontab ON 상태에서 update_clients Task 실행 시 ⇒ error 발생 

    • clients.yml update 후 crontab 작동할 때 까지 대기 ⇒ 변경 전 clients.yml pull 땡긴 후 nothing to commit, working tree clean error 발생

    ⇒ DAG가 돌아가는 중에 혹은 어떤 작업 진행 중 CONF repo에 변경사항이 반영되면 git pull을 요구하기 때문에 error가 발생함.

    또한, git pull 명령어를 수행한다고 해도 merge에 대한 Commit message 요구함.

     

     

    3. git pull 땡기지 않고 DAG 돌리면 → git fetch task fail → (airflow code 변동사항 DAG에 반영 안되었기 때문에)

    • 수동으로 git fetch 성공

     

    4. client_info 실행 시 Fail 발생함

    def get_client_info():
        # global serviceKey
        serviceKey = "07b"
        
        data = open('/clients.yml', encoding="utf-8")
        parsed_data = yaml.load(data, Loader=yaml.FullLoader)
    
        client_info = {
            "rec_dags" : [],
            "seg_dags" : [],
        }
    
        for type in parsed_data[serviceKey]:
            if type == "name":
                client_info["name"] = parsed_data[serviceKey]["name"]
            elif type == "purchase" or type == "rfm":
                if parsed_data[serviceKey][type]['use'] == 'Y':
                    client_info["seg_dags"].append(type)
            else:
                if parsed_data[serviceKey][type]['use'] == 'Y':
                    client_info["rec_dags"].append(type)
    
        return client_info
    
    
    client_info = get_client_info()
    
    print_client_info = BashOperator(
        task_id="print_client_info",
        bash_command="echo {}".format(client_info["name"]),
        queue="keyword_train",
        dag=dag,
    )
    
    delete_serviceKey = PythonOperator(
        task_id="delete_serviceKey",
        python_callable=delete_from_clients,
        # bash_command="echo {{ dag_run.conf }}",
        queue="keyword_train",
        dag=dag
    )
    
    update_clients = BashOperator(
        task_id="update_clients",
        bash_command="cd /CONF && git add clients.yml && git commit -m 'Delete {}' && git push origin master".format(client_info["name"]),
        queue="keyword_train",
        dag=dag,
    )
    • client_info 다음의 print_client_info task를 제외하면 FAIL 발생

    ⇒ get_client_info() 함수를 실행하는 것과 관련 있어 보임.

     

    Xcom은 DAG 내의 task 사이에서 데이터를 전달하기 위해서 사용되는데, CeleryExecutor를 예로 들면, 각 task들이 각기 다른 Worker에서 실행될 수 있으며, Xcom은 이러한 경우 task간 데이터 전달을 가능하게 한다.


    clients.yml 파일을 get_client_info()와 delete_from_clients()에서 중복으로 사용하고 있어서 파일 open에 문제가 있음을 의심해봄.

    => 같은 파일을 open하고 close하지 않은 상태로 두어서 오류가 난 것으로 예상.

     

     

    ⇒ clients.yml을 clients1.yml로 복제 후 실행하기

    def delete_from_clients():
        serviceKey = "07b"
    
        with open('/clients.yml', encoding="utf-8") as f:
            content = f.read()
    
        serviceKeys = [x for x in content.split("\\n\\n")]
        new_clients = [x for x in serviceKeys if serviceKey not in x]
    
        with open('/clients.yml', 'w', encoding="utf-8") as outfile:
            outfile.write('\\n'.join(new_clients))
    
        return 
    
    def get_client_info():
        serviceKey = "07b"
        
        data = open('/clients1.yml', encoding="utf-8")
        parsed_data = yaml.load(data, Loader=yaml.FullLoader)
    
        client_info = {
            "rec_dags" : [],
            "seg_dags" : [],
        }
    
        for type in parsed_data[serviceKey]:
            if type == "name":
                client_info["name"] = parsed_data[serviceKey]["name"]
            elif type == "purchase" or type == "rfm":
                if parsed_data[serviceKey][type]['use'] == 'Y':
                    client_info["seg_dags"].append(type)
            else:
                if parsed_data[serviceKey][type]['use'] == 'Y':
                    client_info["rec_dags"].append(type)
    
        return client_info
    
    with DAG(dag_name, default_args=default_args, schedule_interval=None) as dag:
       
        client_info = get_client_info()
    
        delete_serviceKey = PythonOperator(
            task_id="delete_serviceKey",
            python_callable=delete_from_clients,
            # bash_command="echo {{ dag_run.conf }}",
            queue="keyword_train",
            dag=dag
        )
    
        update_clients = BashOperator(
            task_id="update_clients",
            bash_command="cd /CONF && git add clients.yml && git commit -m 'Delete {}' && git push origin master".format(client_info["name"]),
            queue="keyword_train",
            dag=dag,
        
    
        delete_serviceKey >> update_clients

    성공!

     

    data = open('/clients.yml', encoding="utf-8")
    parsed_data = yaml.load(data, Loader=yaml.FullLoader)
    
    data.close()

    이 부분에서만 close할 것!

    delete_from_clients()with open()으로 여는 것은 close하지 않아도 된다!

    python에서는 with open() 문을 더 선호하므로, with open으로 수정할 것!!

     

     

     

    5. clients.yml write 시 띄어쓰기 문제

    • clients.yml을 새로 쓸 때 해지 고객사를 제외하고 list에 append 하기 때문에
    • 고객사와 고객사 사이에 띄어쓰기 없이 그냥 붙여서 쓰면, 위아래 고객사들은 새로운 clients.yml에 포함되지 않음.
    • 줄바꿈 중간에 space bar 없이 해야함!!
      • Space bar 들어가면 하나의 고객사로 인식해서 새로운 clients.yml에 반영하지 않음.
      • 고객사 중간에 2줄씩 띄어쓰기로 구분해주어야 함.

     

    6. delete_dag type별 dag 생성 오류

    for문에서 Task를 바로 생성하는게 아니라 함수 안에 BashOperator Task를 생성하는 코드를 넣어 돌려주어야 함.

     

    def create_delete_dags(dag_type, client_info):
        delete_dag = BashOperator(
            task_id="delete_{}_{}".format(dag_type, client_info["name"]),
            bash_command = "airflow delete_dag -y {}_{}_v2.0".format(dag_type, client_info["name"]),
            queue="keyword_train",
            dag=dag
        )
    
        return delete_dag
    
    
    git_pull_CONF >> copy_clients >> client_return >> delete_serviceKey >> update_clients 
    
        for dag_type in client_info["rec_dags"]:
            delete_dag = create_delete_dags(dag_type, client_info)
            update_clients >> delete_dag

     

    - 이상 오늘의 삽질일기 끝!

     


    여기저기 삽질도 해보고

    날려도 먹으면서

    배우는 게

    결국 남는거다

    - Z.Sabziller

     

     

    댓글

Designed by Tistory.