DataProcessing/Spark
-
[Spark] Pyspark 데이터프레임 Shape(column) & Size(row) 구하기DataProcessing/Spark 2021. 3. 3. 22:57
Spark dataframe row count dataframe.count() dataframe column(Shape)과 row(Size)개수 count print((dataframe.count(), len(dataframe.columns))) # row, column print - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해보고 날려도 먹으면서 배우는 게 결국 남는거다 - Z.Sabziller
-
[Spark] Pyspark 데이터프레임을 JSON 딕셔너리로 변환하기DataProcessing/Spark 2021. 3. 1. 21:59
Dataframe => JSON 변환하기 1. 데이터프레임에서 toJSON 함수와 collect하면 전체 dataset을 string으로 반환한다. new_dfdf = df.toJSON().collect() print('new_dfdf', type(new_dfdf[0]), new_dfdf) 만약 new_dfdf[0]을 출력해보면, '{' 문자가 나온다. 2. 따라서 key와 value로 접근하기 위한 dict로 변환하려면 개별 map에 대해 json.loads로 해주어야 한다. new_df = df.toJSON().map(lambda x: json.loads(x)).collect() print('new_df', type(new_df[0]), new_df) - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해..
-
[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환DataProcessing/Spark 2021. 2. 25. 23:44
트위터 API로 받아오는 데이터 형태가 딕셔너리 내부에 딕셔너리가 있고, 또 리스트 value 값 안에 딕셔너리가 포함되어 있는 다중구조이므로 딕셔너리 내부 값들에 접근해야 할 필요가 있었습니다. 따라서 먼저 Kafkaf에서 실시간으로 받아오는 데이터를 개별적 RDD로 접근하여 데이터프레임을 생성하고 데이터프레임의 컬럼(트윗 데이터 key값들)을 하나로 합친 새로운 컬럼을 생성하고 저장한 다음 다시 조회하여 딕셔너리 형태로 가공 후 내부 value값들 또한 딕셔너리로 변환하는 데이터 접근 방식을 시도했습니다. dataframe 생성 from pyspark.sql import SparkSession dataframe = SparkSession.createDataFrame(rdd, schema = ['col..
-
[Spark Streaming] spark-streaming-kafka 실행 오류 해결 (feat. 파일 경로 옮기기)DataProcessing/Spark 2021. 2. 24. 23:21
Spark Streaming App 실행 시 오류: Spark Streaming's Kafka libraries not found in the class path. Try one of the following. 1. spark-submit 명령어에 spark-streaming-kafka 버전 추가해주기 Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 spark_consumer.py(실행파일) 2. spark-streaming-kafka-0-8-assembly..
-
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark)DataProcessing/Spark 2021. 2. 23. 23:05
import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils #Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost pyspark-shell' if _..
-
[Spark Streaming] Kafka TransformedDStream 변환하기DataProcessing/Spark 2021. 2. 19. 23:42
Kafka에서 받아온 메시지를 json으로 직접적인 변환을 하려고 하면 다음과 같은 에러 발생: "TypeError: 'TransformedDStream' object is not iterable" "TypeError: 'TransformedDStream' object is not subscriptable" Expecting value: line 1 column 1 (char 0) *DStream 타입은 연속적인 RDD 배열로서, 연속적인 스트리밍 데이터를 의미한다. 여기에서는 Kafka에서 데이터를 실시간으로 받아오면서 생성된다. 따라서 foreachRDD를 사용해서 DStream 각 RDD에 접근하여 임의의 연산 수행이 가능하게 해줍니다. print((message)) => print(type(mes..
-
[Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark)DataProcessing/Spark 2021. 2. 18. 22:43
실시간 스트리밍 서비스를 구현하기 위해 Kafka 브로커(서버)에서 트위터 API 데이터를 가져오는 Consumer기능을 Spark Streaming으로 구현해봤습니다. 아래는 Spark-Consumer 파일 코드입니다. # 이후에 pyspark가 실행되기 때문에 맨 위에 위치하기 import findspark findspark.init() # Spark에 연결하기 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__=="__main__": sc = SparkContext(appName="Kafka Spark D..
-
[Spark] 데이터 가공(Feat. 코로나 Trend분석)DataProcessing/Spark 2021. 2. 17. 18:31
대용량 데이터 처리 및 트렌드 분석을 위해 Twitter API로 데이터 소스를 확보했다. 하지만 여기서 문제는 트윗 데이터 구조에 있었다. 아래와 같이 딕셔너리 형태로 이루어져 있고, 그 아래 하위 레벨에 또 다시 리스트 안에 딕셔너리 형태의 구조로 되어 있었다. 또한 id 값들이 전부 string으로 처리되어 있었다. 따라서 트렌드 분석을 위해서는 하위 레벨 내부 데이터를 조회할 수 있도록 데이터 가공이 필요했고, 이를 먼저 datataframe으로 만들어준 뒤 datafame 자체적으로 가공을 시도해봤다. 1. dataframe dict keys => new column https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ Converting..