DataProcessing
-
실시간 Log 수집기 Apache NiFi 파이프라인 구축DataProcessing 2022. 1. 9. 23:53
{ 'newSession': 'True', 'actionCd': 'DI', 'memberId': '', 'goods': None, 'searchKwd': None, 'browserCd': 'ED', 'browserLang': 'KO', 'regDtm': '2021-11-16T17:38:56.833198', 'deviceCd': 'PC', 'orderNo': None, 'sessionSeq': 2, 'userSeq': 2, 'sessionUserSeq': 1, 'msgType': 'A', 'campaignTypeCd': 'CA', 'algorithmCd': None, 'category': None, 'prevStayTime': 0, 'pubCmpMap': { 'CAe6f969b01b8d4ae6b3996ac..
-
[Spark] Pyspark 데이터프레임 Shape(column) & Size(row) 구하기DataProcessing/Spark 2021. 3. 3. 22:57
Spark dataframe row count dataframe.count() dataframe column(Shape)과 row(Size)개수 count print((dataframe.count(), len(dataframe.columns))) # row, column print - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해보고 날려도 먹으면서 배우는 게 결국 남는거다 - Z.Sabziller
-
[Spark] Pyspark 데이터프레임을 JSON 딕셔너리로 변환하기DataProcessing/Spark 2021. 3. 1. 21:59
Dataframe => JSON 변환하기 1. 데이터프레임에서 toJSON 함수와 collect하면 전체 dataset을 string으로 반환한다. new_dfdf = df.toJSON().collect() print('new_dfdf', type(new_dfdf[0]), new_dfdf) 만약 new_dfdf[0]을 출력해보면, '{' 문자가 나온다. 2. 따라서 key와 value로 접근하기 위한 dict로 변환하려면 개별 map에 대해 json.loads로 해주어야 한다. new_df = df.toJSON().map(lambda x: json.loads(x)).collect() print('new_df', type(new_df[0]), new_df) - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해..
-
[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환DataProcessing/Spark 2021. 2. 25. 23:44
트위터 API로 받아오는 데이터 형태가 딕셔너리 내부에 딕셔너리가 있고, 또 리스트 value 값 안에 딕셔너리가 포함되어 있는 다중구조이므로 딕셔너리 내부 값들에 접근해야 할 필요가 있었습니다. 따라서 먼저 Kafkaf에서 실시간으로 받아오는 데이터를 개별적 RDD로 접근하여 데이터프레임을 생성하고 데이터프레임의 컬럼(트윗 데이터 key값들)을 하나로 합친 새로운 컬럼을 생성하고 저장한 다음 다시 조회하여 딕셔너리 형태로 가공 후 내부 value값들 또한 딕셔너리로 변환하는 데이터 접근 방식을 시도했습니다. dataframe 생성 from pyspark.sql import SparkSession dataframe = SparkSession.createDataFrame(rdd, schema = ['col..
-
[Spark Streaming] spark-streaming-kafka 실행 오류 해결 (feat. 파일 경로 옮기기)DataProcessing/Spark 2021. 2. 24. 23:21
Spark Streaming App 실행 시 오류: Spark Streaming's Kafka libraries not found in the class path. Try one of the following. 1. spark-submit 명령어에 spark-streaming-kafka 버전 추가해주기 Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 spark_consumer.py(실행파일) 2. spark-streaming-kafka-0-8-assembly..
-
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark)DataProcessing/Spark 2021. 2. 23. 23:05
import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils #Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost pyspark-shell' if _..
-
[Spark Streaming] Kafka TransformedDStream 변환하기DataProcessing/Spark 2021. 2. 19. 23:42
Kafka에서 받아온 메시지를 json으로 직접적인 변환을 하려고 하면 다음과 같은 에러 발생: "TypeError: 'TransformedDStream' object is not iterable" "TypeError: 'TransformedDStream' object is not subscriptable" Expecting value: line 1 column 1 (char 0) *DStream 타입은 연속적인 RDD 배열로서, 연속적인 스트리밍 데이터를 의미한다. 여기에서는 Kafka에서 데이터를 실시간으로 받아오면서 생성된다. 따라서 foreachRDD를 사용해서 DStream 각 RDD에 접근하여 임의의 연산 수행이 가능하게 해줍니다. print((message)) => print(type(mes..
-
[Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark)DataProcessing/Spark 2021. 2. 18. 22:43
실시간 스트리밍 서비스를 구현하기 위해 Kafka 브로커(서버)에서 트위터 API 데이터를 가져오는 Consumer기능을 Spark Streaming으로 구현해봤습니다. 아래는 Spark-Consumer 파일 코드입니다. # 이후에 pyspark가 실행되기 때문에 맨 위에 위치하기 import findspark findspark.init() # Spark에 연결하기 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__=="__main__": sc = SparkContext(appName="Kafka Spark D..