-
실시간 Log 수집기 Apache NiFi 파이프라인 구축DataProcessing 2022. 1. 9. 23:53728x90
<Input Data Type>
{ 'newSession': 'True', 'actionCd': 'DI', 'memberId': '', 'goods': None, 'searchKwd': None, 'browserCd': 'ED', 'browserLang': 'KO', 'regDtm': '2021-11-16T17:38:56.833198', 'deviceCd': 'PC', 'orderNo': None, 'sessionSeq': 2, 'userSeq': 2, 'sessionUserSeq': 1, 'msgType': 'A', 'campaignTypeCd': 'CA', 'algorithmCd': None, 'category': None, 'prevStayTime': 0, 'pubCmpMap': { 'CAe6f969b01b8d4ae6b3996ac53dd30240': { 'campaignKey': 'CAe6f969b01b8d4ae6b3996ac53dd30240', 'msgType': 'A', 'msgCd': None, 'campaignTypeCd': 'CA' } }, 'customData': {}, 'memberData': {}, }
GCP_pub/sub_data(GetFlowFile)
Custom text data input 설정
Replace Text 데이터 처리 부분
- 모든 string single quote → double quote
- None → “” 빈 스트링 처리
- {} → “” 빈 스트링 처리
<return value Type>
[ { "newSession": "True", "actionCd": "DI", "memberId": "", "goods": "", "searchKwd": "", "browserCd": "ED", "browserLang": "KO", "regDtm": "2021-11-16T17:38:56.833198", "deviceCd": "PC", "orderNo": "", "sessionSeq": 2, "userSeq": 2, "sessionUserSeq": 1, "stayTime": 0, "msgType": "A", "campaignTypeCd": "CA", "algorithmCd": "", "category": "", "prevStayTime": 0, "pubCmpMap": { "CAe6f969b01b8d4ae6b3996ac53dd30240": { "campaignKey": "CAe6f969b01b8d4ae6b3996ac53dd30240", "msgType": "A", "msgCd": "", "campaignTypeCd": "CA" } }, "customData": {}, "memberData": {}, } ]
EvaluateJsonPath
Json data의 Key, Value 형태로 변수 추출하기
PutParquet
Json data → Parquet file 형식으로 변환하여 HDFS /user/gr 경로에 저장하기
FetchParquet
이전 프로세스에서 HDFS에 올려놓은 Parquet 파일을 가져와서 Parquet으로 레코드 쓰기 설정
PutS3Object
S3 버킷 경로 설정 후 parquet 파일 저장
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein