Spark
-
[Spark] Pyspark 데이터프레임 Shape(column) & Size(row) 구하기DataProcessing/Spark 2021. 3. 3. 22:57
Spark dataframe row count dataframe.count() dataframe column(Shape)과 row(Size)개수 count print((dataframe.count(), len(dataframe.columns))) # row, column print - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해보고 날려도 먹으면서 배우는 게 결국 남는거다 - Z.Sabziller
-
[Spark Streaming] spark-streaming-kafka 실행 오류 해결 (feat. 파일 경로 옮기기)DataProcessing/Spark 2021. 2. 24. 23:21
Spark Streaming App 실행 시 오류: Spark Streaming's Kafka libraries not found in the class path. Try one of the following. 1. spark-submit 명령어에 spark-streaming-kafka 버전 추가해주기 Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 spark_consumer.py(실행파일) 2. spark-streaming-kafka-0-8-assembly..
-
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark)DataProcessing/Spark 2021. 2. 23. 23:05
import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils #Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost pyspark-shell' if _..
-
[Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark)DataProcessing/Spark 2021. 2. 18. 22:43
실시간 스트리밍 서비스를 구현하기 위해 Kafka 브로커(서버)에서 트위터 API 데이터를 가져오는 Consumer기능을 Spark Streaming으로 구현해봤습니다. 아래는 Spark-Consumer 파일 코드입니다. # 이후에 pyspark가 실행되기 때문에 맨 위에 위치하기 import findspark findspark.init() # Spark에 연결하기 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils if __name__=="__main__": sc = SparkContext(appName="Kafka Spark D..
-
[Spark] 데이터 가공(Feat. 코로나 Trend분석)DataProcessing/Spark 2021. 2. 17. 18:31
대용량 데이터 처리 및 트렌드 분석을 위해 Twitter API로 데이터 소스를 확보했다. 하지만 여기서 문제는 트윗 데이터 구조에 있었다. 아래와 같이 딕셔너리 형태로 이루어져 있고, 그 아래 하위 레벨에 또 다시 리스트 안에 딕셔너리 형태의 구조로 되어 있었다. 또한 id 값들이 전부 string으로 처리되어 있었다. 따라서 트렌드 분석을 위해서는 하위 레벨 내부 데이터를 조회할 수 있도록 데이터 가공이 필요했고, 이를 먼저 datataframe으로 만들어준 뒤 datafame 자체적으로 가공을 시도해봤다. 1. dataframe dict keys => new column https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ Converting..
-
[환경설정] Spark 설치 및 ubuntu 환경 설정 (feat.AWS)DataProcessing/Spark 2021. 2. 15. 23:24
Spark 설치 시 다음과 같은 프로그램들이 필요합니다. Spark 2.4.7 Java jdk 1.8_251 Scala 2.11.12 Hadoop 2.7.3 Python 3.7.5 (Pyspark 사용 시 필요) Spark 2.4.7 $ wget https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz $ tar xvf spark-* echo "export SPARK_HOME= ~/programs/Spark/spark-2.4.7-bin-hadoop2.7" echo "export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin" echo "export PYSPARK_PYTHON=/usr/bin/p..
-
[Spark] Trend 분석 연관어 빈도수 구하기 (feat. 불용어 처리)DataProcessing/Spark 2021. 2. 6. 20:14
총 2037건의 데이터 MongoDB 저장 후 연관어 빈도수 확인 및 불용어 처리하기 추측 단어들 리스트 위: 위.중증 형: 형집행정지 초: 올해 초, 초.중등 노: 노 마스크, 노 메이크업, 노 재팬 카: 카셰어링 융: 융.복합 거리두기의 경우 '거리' 와 '두기'로 나누어짐 => '거리두기'는 한 단어로 봐도 무방해보임 '두기'는 stopwords에 추가하고 거리 count를 '거리두기'로 반환 거리의 경우 거 와 리 로 나누어짐 => 거리일수도 있고, 코로나 우울(블루), 나만 그런 거 아니지? 와 같은 거의 사용이 있을 수 있다고 판단 확진자의 경우확과 진자로 나누어짐 => total count로 봤을 때, 비슷하다고 판단하여 확은 Stopwords에 포함시키고, 진자는 확진자로 바꾸어 DB 저장..
-
[Spark] 스파크 Dataframe 데이터프레임 가공하기DataProcessing/Spark 2021. 1. 31. 23:57
모듈 import import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SQLContext ## Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost:port pyspark-shell' sc = SparkContext(appName="app name") sqlContext = SQLContext(sc) CassandraDB 데이터 조..