sparkstreaming
-
[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환DataProcessing/Spark 2021. 2. 25. 23:44
트위터 API로 받아오는 데이터 형태가 딕셔너리 내부에 딕셔너리가 있고, 또 리스트 value 값 안에 딕셔너리가 포함되어 있는 다중구조이므로 딕셔너리 내부 값들에 접근해야 할 필요가 있었습니다. 따라서 먼저 Kafkaf에서 실시간으로 받아오는 데이터를 개별적 RDD로 접근하여 데이터프레임을 생성하고 데이터프레임의 컬럼(트윗 데이터 key값들)을 하나로 합친 새로운 컬럼을 생성하고 저장한 다음 다시 조회하여 딕셔너리 형태로 가공 후 내부 value값들 또한 딕셔너리로 변환하는 데이터 접근 방식을 시도했습니다. dataframe 생성 from pyspark.sql import SparkSession dataframe = SparkSession.createDataFrame(rdd, schema = ['col..
-
[Spark Streaming] spark-streaming-kafka 실행 오류 해결 (feat. 파일 경로 옮기기)DataProcessing/Spark 2021. 2. 24. 23:21
Spark Streaming App 실행 시 오류: Spark Streaming's Kafka libraries not found in the class path. Try one of the following. 1. spark-submit 명령어에 spark-streaming-kafka 버전 추가해주기 Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 spark_consumer.py(실행파일) 2. spark-streaming-kafka-0-8-assembly..
-
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark)DataProcessing/Spark 2021. 2. 23. 23:05
import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils #Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost pyspark-shell' if _..
-
[Spark Streaming] Kafka TransformedDStream 변환하기DataProcessing/Spark 2021. 2. 19. 23:42
Kafka에서 받아온 메시지를 json으로 직접적인 변환을 하려고 하면 다음과 같은 에러 발생: "TypeError: 'TransformedDStream' object is not iterable" "TypeError: 'TransformedDStream' object is not subscriptable" Expecting value: line 1 column 1 (char 0) *DStream 타입은 연속적인 RDD 배열로서, 연속적인 스트리밍 데이터를 의미한다. 여기에서는 Kafka에서 데이터를 실시간으로 받아오면서 생성된다. 따라서 foreachRDD를 사용해서 DStream 각 RDD에 접근하여 임의의 연산 수행이 가능하게 해줍니다. print((message)) => print(type(mes..
-
[Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark)DataProcessing/Spark 2021. 2. 18. 22:43
실시간 스트리밍 서비스를 구현하기 위해 Kafka 브로커(서버)에서 트위터 API 데이터를 가져오는 Consumer기능을 Spark Streaming으로 구현해봤습니다. 아래는 Spark-Consumer 파일 코드입니다. # 이후에 pyspark가 실행되기 때문에 맨 위에 위치하기 import findspark findspark.init() # Spark에 연결하기 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__=="__main__": sc = SparkContext(appName="Kafka Spark D..
-
Spark 개념 정리IT용어정리 2021. 1. 8. 00:03
Spark input sources File source: txt, csv, json, orc 등 Kafka source: Kafka broker를 사용함 Socket source: UTF8 text data from socket connection(테스팅 목적으로만) Spark의 장점 unification of disparate 데이터 처리 능력 Spark streaming receivers가 병렬로 데이터를 받아서 스파크 workers nodes에 쌓으면, 스파크 엔진이 짧은 테스크를 돌려 배치 처리를 한다. 이는 곧 효율적인 로드밸런싱과 빠른 복구를 가능하게 한다. Stream processing 데이터들이 지속적으로 유입되고 나가는 과정에서 분석/SQL을 수행하는 것 데이터가 이동 중이거나, 생성..