전체 글
-
[Airflow] Airflow ExternalTaskSensor 활용법(feat. DAG Dependency)Airflow 2023. 2. 7. 23:50
Airflow DAG 사이의 dependency를 설정하는 방법은 2가지가 있는데 1) TriggerDagRunOperator 2) ExternalTaskSensor 1번 TriggerDagRunOperator의 경우, DAG A가 먼저 실행 된 후 task 종료 후 TriggerDagRunOperator task가 실행되면서 DAG B를 trigger하는 방식이다. 이 경우 DAG A에 task를 설정하고, DAG B의 스케줄 설정 여부와 관계없이 트리거를 할 수 있다. # DAG A Task 1 >> Task 2 >> TriggerDagRunOperator Task (Trigger DAG B Task 3) # DAG B Task 3 2번 ExternalTaskSensor이 경우, DAG B의 Exte..
-
[Docker] Dockerfile user 권한 변경 설정Docker 2023. 1. 2. 22:15
Dockerfile에서 base 이미지를 사용하고 필요한 패키치를 설치하려고 할 때, 이미지를 빌드 시 다음과 같은 permission 에러가 발생한다. => ERROR [2/4] RUN apt-get update 0.3s ------ > [2/4] RUN apt-get update: #5 0.263 Reading package lists... #5 0.272 E: List directory /var/lib/apt/lists/partial is missing. - Acquire (13: Permission denied) base image에 따라 user권한이 바뀌어 있을 수 있으므로, FROM 코드 밑에 USER root를 추가하여 root 유저로 권한 변경 후 패키지 설치 코드를 작성하면 된다. FR..
-
파일 싱크 자동화를 위한 incrontab 스크립트 개발(feat. git->s3)개발Tip 2022. 12. 26. 21:32
도입배경 Airflow에서 실행하는 ETL pyspark 스크립트를 s3 버킷에 저장해두고 사용중이었는데, 그러면 매번 수동으로 파일을 upload해야하고, 파일의 히스토리 및 버전 관리의 필요성을 느낌. 따라서 Git에 파일을 올리고, s3에도 자동으로 업데이트 해주어 싱크를 맞출 수 있는 자동화 시스템을 구현할 수 있는 방법 모색함. 이때 고려했던 점은, 현재 회사에서 bitbucket server를 사용중이므로 GitHub Action를 사용할 수 없었고 Jenkins를 사용해보려고 했으나, 이전에 써보지 않았으므로 러닝커브를 고려하여 구글링을 통해 알게된 incrontab으로 간단하게 스크립트로 짜서 구현하기로함. 개발과정 1. 로컬에서 git에 etl 파일 업도르하면, Airflow 서버에서 c..
-
[Airflow] DAG skipped 상태에서 멈춰있을 때(feat. queue 지정)Airflow 2022. 10. 21. 20:49
Airflow DAG 테스트 실행 중 마지막 task에서 skipped 상태에서 멈춰 중단된 상태. 다시 재시도를 위해 해당 task Clear-> Run 하면 Queue에서 멈춰있음. task의 코드를 살펴보니 queue를 할당하는 것을 빼먹은 것 발견함. cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}", ) 아래처럼 queue 이름을 지정해서 할당하면 success로 바뀐다! cluster_remover = EmrTerminateJobFlowOperator( task_id..
-
S3/Athena DB명 하이픈 포함 시 이스케이프 문자 처리개발Tip 2022. 9. 16. 22:54
EMR에서 데이터 전처리 후 S3에 테이블로 저장 시 DB명에 하이픈(-) 문자가 포함되어 있는 경우, 그냥 처리하면 에러가 발생한다. 예시) dataframe.write.format("parquet").option("path", "s3://folder/").saveAsTable("db-name.table_name") 이때 db명이 test-db 라고 한다면, 이스케이프 문자 처리를 해주어야 하는데 db명을 백틱(‘)으로 감싸주면 된다. => `test-db` dataframe.write.format("parquet").option("path", "s3://folder/").saveAsTable("`db-name`.table_name") 이렇게하면 정상적으로 저장이 되는 것을 확인 할 수 있다 😃 - 이..
-
[Airflow] airflow health status check 자동화 설정Airflow 2022. 8. 28. 19:46
가끔 예고없이 airflow scheduler가 중단되는 현상이 발생할 때, airflow health status 체크를 통해 스케줄러 재가동을 자동화하는 방법을 공유하려고 합니다. airflow 공식 문서에 따르면 cli 명령어로 schduler와 DB, 그리고 worker의 상태를 확인할 수 있습니다. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health Checking Airflow Health Status — Airflow Documentation airflow.apache.org 이 중에서 scheduler와 worker 프로세스를 감지한 후..
-
[Airflow] Airflow DAG skipped state 멈춤 현상 해결Airflow 2022. 8. 21. 23:36
Airflow DAG 실행 시 특정 Task에서 skipped 상태에서 멈춰있음. 해당 테스크 Clear -> Run 하면 Queue에서 멈춰있는 현상. => 해당 테스크 operator 코드 부분에서 worker queue를 빠뜨렸음을 발견함. ⚠️ 기존 코드 cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}", ) ✅ 변경 코드 cluster_remover = EmrTerminateJobFlowOperator( task_id='cluster_remover', job_flow_..
-
[MySQL] mysqlclient 설치 에러 해결 방법DataBase/MySQL 2022. 8. 15. 23:44
python3 에서 mysql db가 사용가능한 mysqlclient 모듈을 설치할 때, > pip install mysqlclient error: command 'gcc' failed with exit status 1 .. note: This error originates from a subprocess, and is likely not a problem with pip. error: legacy-install-failure Encountered error whie trying to install package. > mysqlclient 해당 에러가 나오면서 설치에 실패했다고 나옵니다. => Python 3 and MySQL 개발용 라이브러리 사전 설치 필요 For UBUNTU or Debian s..