dataframe
-
[Spark] Pyspark 데이터프레임 Shape(column) & Size(row) 구하기DataProcessing/Spark 2021. 3. 3. 22:57
Spark dataframe row count dataframe.count() dataframe column(Shape)과 row(Size)개수 count print((dataframe.count(), len(dataframe.columns))) # row, column print - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해보고 날려도 먹으면서 배우는 게 결국 남는거다 - Z.Sabziller
-
[Spark] Pyspark 데이터프레임을 JSON 딕셔너리로 변환하기DataProcessing/Spark 2021. 3. 1. 21:59
Dataframe => JSON 변환하기 1. 데이터프레임에서 toJSON 함수와 collect하면 전체 dataset을 string으로 반환한다. new_dfdf = df.toJSON().collect() print('new_dfdf', type(new_dfdf[0]), new_dfdf) 만약 new_dfdf[0]을 출력해보면, '{' 문자가 나온다. 2. 따라서 key와 value로 접근하기 위한 dict로 변환하려면 개별 map에 대해 json.loads로 해주어야 한다. new_df = df.toJSON().map(lambda x: json.loads(x)).collect() print('new_df', type(new_df[0]), new_df) - 이상 오늘의 삽질일기 끝! 여기저기 삽질도 해..
-
[Spark Streaming] PySpark 데이터프레임 모든 컬럼 json으로 변환 & 데이터프레임 딕셔너리 내부 value값 dict 변환DataProcessing/Spark 2021. 2. 25. 23:44
트위터 API로 받아오는 데이터 형태가 딕셔너리 내부에 딕셔너리가 있고, 또 리스트 value 값 안에 딕셔너리가 포함되어 있는 다중구조이므로 딕셔너리 내부 값들에 접근해야 할 필요가 있었습니다. 따라서 먼저 Kafkaf에서 실시간으로 받아오는 데이터를 개별적 RDD로 접근하여 데이터프레임을 생성하고 데이터프레임의 컬럼(트윗 데이터 key값들)을 하나로 합친 새로운 컬럼을 생성하고 저장한 다음 다시 조회하여 딕셔너리 형태로 가공 후 내부 value값들 또한 딕셔너리로 변환하는 데이터 접근 방식을 시도했습니다. dataframe 생성 from pyspark.sql import SparkSession dataframe = SparkSession.createDataFrame(rdd, schema = ['col..
-
[Spark] 데이터 가공(Feat. 코로나 Trend분석)DataProcessing/Spark 2021. 2. 17. 18:31
대용량 데이터 처리 및 트렌드 분석을 위해 Twitter API로 데이터 소스를 확보했다. 하지만 여기서 문제는 트윗 데이터 구조에 있었다. 아래와 같이 딕셔너리 형태로 이루어져 있고, 그 아래 하위 레벨에 또 다시 리스트 안에 딕셔너리 형태의 구조로 되어 있었다. 또한 id 값들이 전부 string으로 처리되어 있었다. 따라서 트렌드 분석을 위해서는 하위 레벨 내부 데이터를 조회할 수 있도록 데이터 가공이 필요했고, 이를 먼저 datataframe으로 만들어준 뒤 datafame 자체적으로 가공을 시도해봤다. 1. dataframe dict keys => new column https://mungingdata.com/pyspark/dict-map-to-multiple-columns/ Converting..
-
[Spark] 스파크 Dataframe 데이터프레임 가공하기DataProcessing/Spark 2021. 1. 31. 23:57
모듈 import import findspark findspark.init() from pyspark import SparkContext from pyspark.sql import SQLContext ## Cassandra import os os.environ['PYSPARK_SUBMIT_ARGS'] = \ '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.4.1' \ ' --conf spark.cassandra.connection.host=localhost:port pyspark-shell' sc = SparkContext(appName="app name") sqlContext = SQLContext(sc) CassandraDB 데이터 조..
-
[Spark] Tutorial #1 데이터 조회, 가공 & 데이터프레임 생성DataProcessing/Spark 2021. 1. 24. 23:09
Text파일 읽어오기 & 라인(row) 수 반환 sc lines = sc.textFile("README.md") # 해당 폴더 안에 있는 README.md 파일 읽기 lines.count() # 해당 파일 라인 수 전체 텍스트 읽기 (collect 함수) lines.collect() 특정 단어 포함한 문장 반환하기 python_in_lines = lines.filter(lambda line: "Python" in line) python_in_lines.collect() RDD map함수 => 각 데이터 요소에 함수를 적용해 'map' 타입으로 변환 rdd = sc.textFile("README.md") rdd_map = rdd.map(lambda x: (x,1)) rdd_map.collect() 첫 5문..