-
[Spark] 데이터 가공(Feat. 코로나 Trend분석)DataProcessing/Spark 2021. 2. 17. 18:31728x90
대용량 데이터 처리 및 트렌드 분석을 위해 Twitter API로 데이터 소스를 확보했다.
하지만 여기서 문제는 트윗 데이터 구조에 있었다.
아래와 같이 딕셔너리 형태로 이루어져 있고, 그 아래 하위 레벨에 또 다시 리스트 안에 딕셔너리 형태의 구조로 되어 있었다.
또한 id 값들이 전부 string으로 처리되어 있었다.
따라서 트렌드 분석을 위해서는 하위 레벨 내부 데이터를 조회할 수 있도록 데이터 가공이 필요했고,
이를 먼저 datataframe으로 만들어준 뒤 datafame 자체적으로 가공을 시도해봤다.
1. dataframe dict keys => new column
https://mungingdata.com/pyspark/dict-map-to-multiple-columns/
리트윗 수를 카운트하기 위해 referenced_tweets의 value 값인 type과 id(딕셔너리 형태) key 값을 새로운 필드로 하는 데이터프레임 생성 후 필드 값들을 type과 id(딕셔너리 형태) value 값들로 저장하는 가공을 진행했다.
## count retweet tweets_retweet = tweets.select("referenced_tweets") tweets_retweet \ .withColumn("retweet_type", col("referenced_tweets").getItem("type")) \ .withColumn("retweet_id", col("referenced_tweets").getItem("id")) \ .show()
2. Remove lists from values
다음으로 리스트 안에 있는 value 값들만 가져오기 위해 인덱스로 접근하여 꺼내왔다.
## count retweet tweets_retweet = tweets.select("referenced_tweets") tweets_retweet \ .withColumn("retweet_type", col("referenced_tweets").getItem("type")[0]) \ .withColumn("retweet_id", col("referenced_tweets").getItem("id")[0]) \ .show()
*Show() 적용 시 변수 생성 적용X
show()는 dataframe을 바로 조회하는 명령어이므로 변수 선언과 함께 사용하면 다음과 같은 오류메세지가 나온다.
Traceback (most recent call last):
File "Trend_Analysis.py", line 67, in <module>
retweet_count = retweet.groupBy("retweet_type").count()
AttributeError: 'NoneType' object has no attribute 'groupBy'따라서 해당 dataframe을 변수화 하려면 show()를 제외한 나머지를 변수 선언 후, 해당 변수로 show() 해야 함.
3. Remove rows with null values
다음은 includes 안에 있는 지역 정보를 가져온 결과값 중 결측값을 제거한 것이다.
dataframe.na.drop()
4. Get a value from dataframe
tweet_message = tweets.select("message").collect() print(tweet_message) # list 내 Row 반환
for i in tweet_message: # 리스트 안에서 Row들을 반복하면서 print('message: ', i['message']) # message를 키로 사용하여 value 값을 반환 print()
이상 오늘의 삽질일기 끝!
여기저기 삽질도 해보고
날려도 먹으면서
배우는 게
결국 남는거다
- Z.Sabziller
'DataProcessing > Spark' 카테고리의 다른 글
[Spark Streaming] Kafka TransformedDStream 변환하기 (0) 2021.02.19 [Spark Streaming] Tutorial #1 트윗 데이터 실시간 스트리밍(feat. Kafka, Pyspark) (0) 2021.02.18 [환경설정] Spark 설치 및 ubuntu 환경 설정 (feat.AWS) (0) 2021.02.15 [Spark] Trend 분석 연관어 빈도수 구하기 (feat. 불용어 처리) (0) 2021.02.06 [Spark] 스파크 Dataframe 데이터프레임 가공하기 (0) 2021.01.31