-
[Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark)DataProcessing/Spark 2021. 2. 18. 22:43728x90
실시간 스트리밍 서비스를 구현하기 위해 Kafka 브로커(서버)에서 트위터 API 데이터를 가져오는
Consumer기능을 Spark Streaming으로 구현해봤습니다.
아래는 Spark-Consumer 파일 코드입니다.
# 이후에 pyspark가 실행되기 때문에 맨 위에 위치하기 import findspark findspark.init()
# Spark에 연결하기 from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils
if __name__=="__main__": sc = SparkContext(appName="Kafka Spark Demo") # Spark 실행 및 RDD 생성 기능 ssc = StreamingContext(sc,5) # 5초 간격 배치로 스트리팅 데이터 가져오기 # Kafka에서 가져오는 메시지형태로 이루어진 트윗 데이터를 TransformedDStream 타입으로 반환 message = KafkaUtils.createDirectStream(ssc, topics=["test"], kafkaParams={"metadata.broker.list":"localhost:9092"})
# 트윗데이터를 map으로 변형(transformation) 후 flatMap으로 각 트윗의 단어들 반환 msg = message.map(lambda tweet: tweet[1]).flatMap(lambda x:x.split(" ")) msg.pprint() ssc.start() # 실제 스트리밍 app을 실행시키는 코드 ssc.awaitTermination() # 강제 종료 or stop()코드 발견 전까지 실행 유지
Spark Streaming 실행 오류
TypeError: 'JavaPackage' object is not callable
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
- Include the Kafka library and its dependencies with in the spark-submit command as $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.7 ...
Spark Streaming 실행 시 오류를 해결하는 방법으로 위와같이 2가지를 추천해줍니다.
첫 번째 방법으로 Spark Consumer 실행 시 해당 명령어를 사용해서 돌려봤지만 실패.
> spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 spark_consumer.py
따라서 두 번째 방법으로 해결해봤습니다.
해결방법
https://search.maven.org/search?q=a:spark-streaming-kafka-0-8-assembly_2.11
- jar 파일로 다운로드
- spark 설치 경로 내 jars 폴더로 옮기기 (C:\Spark\spark-2.4.7-bin-hadoop2.7\jars)
- Pycharm 창 끄고 다시 실행
- 이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'DataProcessing > Spark' 카테고리의 다른 글
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark) (0) 2021.02.23 [Spark Streaming] Kafka TransformedDStream 변환하기 (0) 2021.02.19 [Spark] 데이터 가공(Feat. 코로나 Trend분석) (0) 2021.02.17 [환경설정] Spark 설치 및 ubuntu 환경 설정 (feat.AWS) (0) 2021.02.15 [Spark] Trend 분석 연관어 빈도수 구하기 (feat. 불용어 처리) (0) 2021.02.06