-
[Spark Streaming] Kafka-Spark Streaming-Cassandra 연동 (feat.pyspark)DataProcessing/Spark 2021. 2. 23. 23:05728x90
import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils #Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost pyspark-shell' if __name__=="__main__": def handle_rdd(rdd): if not rdd.isEmpty(): global ss dataframe = ss.createDataFrame(rdd, schema=['word', 'count']) dataframe.show(5) dataframe.write \ .format("org.apache.spark.sql.cassandra") \ .mode('append') \ .options(table="words", keyspace="streaming_test") \ .save() sc = SparkContext(appName="Kafka Spark Demo") ssc = StreamingContext(sc,5) #5=no.of seconds ss = SparkSession.builder \ .appName('SparkCassandraApp') \ .config('spark.cassandra.connection.host', 'localhost') \ .config('spark.cassandra.connection.port', '9042') \ .config('spark.cassandra.output.consistency.level', 'ONE') \ .getOrCreate() message = KafkaUtils.createDirectStream(ssc, topics=["test"], kafkaParams={"metadata.broker.list":"localhost:9092"}) lines = message.map(lambda x: x[1]) transform = lines.map(lambda tweet: (tweet, int(len(tweet.split())))) transform.foreachRDD(handle_rdd) ssc.start() ssc.awaitTermination()
- kafka-producer 실행
- spark-consumer 실행
- cassandra DB 조회
- 이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'DataProcessing > Spark' 카테고리의 다른 글
[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환 (2) 2021.02.25 [Spark Streaming] spark-streaming-kafka 실행 오류 해결 (feat. 파일 경로 옮기기) (0) 2021.02.24 [Spark Streaming] Kafka TransformedDStream 변환하기 (0) 2021.02.19 [Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark) (0) 2021.02.18 [Spark] 데이터 가공(Feat. 코로나 Trend분석) (0) 2021.02.17