DataProcessing/Spark

[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환

쫄보삽질러 2021. 2. 25. 23:44

트위터 API로 받아오는 데이터 형태가 딕셔너리 내부에 딕셔너리가 있고, 또 리스트 value 값 안에

딕셔너리가 포함되어 있는 다중구조이므로 딕셔너리 내부 값들에 접근해야 할 필요가 있었습니다.

 

트위터 데이터 형식

 

따라서 먼저 Kafkaf에서 실시간으로 받아오는 데이터를 개별적 RDD로 접근하여 데이터프레임을 생성하고

데이터프레임의 컬럼(트윗 데이터 key값들)을 하나로 합친 새로운 컬럼을 생성하고 저장한 다음

다시 조회하여 딕셔너리 형태로 가공 후 내부 value값들 또한 딕셔너리로 변환하는 데이터 접근 방식을 시도했습니다.

 

dataframe 생성

from pyspark.sql import SparkSession

dataframe = SparkSession.createDataFrame(rdd, schema = ['col_name1', 'col_name2', ...])

 

dataframe 내 모든 colums => 하나의 new json column으로 생성하기

df = dataframe.withColumn("jsonCol", functions.to_json(functions.struct([dataframe[x] for x in dataframe.columns])))

 

new json column 값 조회하기

 df_to_json = df.select('jsonCol').collect()   # return list type

 

 

list 값을 dictionary로 변환하기

 df_to_json = df.select('jsonCol').collect()[0].asDict()

 

df_to_json value 값 dictionary로 변환하기

import json

df_value_to_json = json.loads(df_to_json['jsonCol'])

 

 

 

 

 

- 이상 오늘의 삽질일기 끝!

 


여기저기 삽질도 해보고

날려도 먹으면서

배우는 게

결국 남는거다

- Z.Sabziller

반응형