-
[Spark] 데이터 가공(Feat. 코로나 Trend분석)DataProcessing/Spark 2021. 2. 17. 18:31
대용량 데이터 처리 및 트렌드 분석을 위해 Twitter API로 데이터 소스를 확보했다.
하지만 여기서 문제는 트윗 데이터 구조에 있었다.
아래와 같이 딕셔너리 형태로 이루어져 있고, 그 아래 하위 레벨에 또 다시 리스트 안에 딕셔너리 형태의 구조로 되어 있었다.
또한 id 값들이 전부 string으로 처리되어 있었다.
'covid-19' 키워드로 가져온 트윗 데이터 따라서 트렌드 분석을 위해서는 하위 레벨 내부 데이터를 조회할 수 있도록 데이터 가공이 필요했고,
이를 먼저 datataframe으로 만들어준 뒤 datafame 자체적으로 가공을 시도해봤다.
1. dataframe dict keys => new column
https://mungingdata.com/pyspark/dict-map-to-multiple-columns/
Converting a PySpark Map / Dictionary to Multiple Columns - MungingData
This post explains how to convert a MapType column into multiple columns. This operation can be slow, so performant work-arounds are discussed.
mungingdata.com
리트윗 수를 카운트하기 위해 referenced_tweets의 value 값인 type과 id(딕셔너리 형태) key 값을 새로운 필드로 하는 데이터프레임 생성 후 필드 값들을 type과 id(딕셔너리 형태) value 값들로 저장하는 가공을 진행했다.
리트윗 정보가 담긴 referened_tweets 값 ## count retweet tweets_retweet = tweets.select("referenced_tweets") tweets_retweet \ .withColumn("retweet_type", col("referenced_tweets").getItem("type")) \ .withColumn("retweet_id", col("referenced_tweets").getItem("id")) \ .show()
retweet dataframe 생성 2. Remove lists from values
다음으로 리스트 안에 있는 value 값들만 가져오기 위해 인덱스로 접근하여 꺼내왔다.
## count retweet tweets_retweet = tweets.select("referenced_tweets") tweets_retweet \ .withColumn("retweet_type", col("referenced_tweets").getItem("type")[0]) \ .withColumn("retweet_id", col("referenced_tweets").getItem("id")[0]) \ .show()
리스트에서 value값 꺼내오기 *Show() 적용 시 변수 생성 적용X
show()는 dataframe을 바로 조회하는 명령어이므로 변수 선언과 함께 사용하면 다음과 같은 오류메세지가 나온다.
Traceback (most recent call last):
File "Trend_Analysis.py", line 67, in <module>
retweet_count = retweet.groupBy("retweet_type").count()
AttributeError: 'NoneType' object has no attribute 'groupBy'따라서 해당 dataframe을 변수화 하려면 show()를 제외한 나머지를 변수 선언 후, 해당 변수로 show() 해야 함.
show() 제거 후 retweet변수 선언 리트윗 count 정상 출력 3. Remove rows with null values
다음은 includes 안에 있는 지역 정보를 가져온 결과값 중 결측값을 제거한 것이다.
dataframe.na.drop()
결측값 제거 전 결측값 제거 후 4. Get a value from dataframe
tweet_message = tweets.select("message").collect() print(tweet_message) # list 내 Row 반환
리스트에 담긴 트윗 메세지들 for i in tweet_message: # 리스트 안에서 Row들을 반복하면서 print('message: ', i['message']) # message를 키로 사용하여 value 값을 반환 print()
메세지 하나씩 조회하기 이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'DataProcessing > Spark' 카테고리의 다른 글
[Spark Streaming] Kafka TransformedDStream 변환하기 (0) 2021.02.19 [Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark) (0) 2021.02.18 [환경설정] Spark 설치 및 ubuntu 환경 설정 (feat.AWS) (0) 2021.02.15 [Spark] Trend 분석 연관어 빈도수 구하기 (feat. 불용어 처리) (0) 2021.02.06 [Spark] 스파크 Dataframe 데이터프레임 가공하기 (0) 2021.01.31