-
python multiprocessing Pool을 활용한 데이터 병렬 처리MLOps 2024. 12. 8. 21:59
🍏 배경
tfidf 추천 모델의 cosine 유사도 추출 후 추천 결과를 뽑기 위한 단계에서
16만건의 유저 데이터를 처리하기 위한 분산처리 및 최적화가 필요했습니다.
🍎 해결방법
python의 threading은 GIL(Global Interpreter Lock) 기능 때문에 병렬처리가 어려움으로
여러 작업을 효율적으로 병렬 처리할 수 있는 방식인 multiprocessing.Pool을 활용했습니다.
https://ninano1109.tistory.com/294
multithread vs multiprocess (feat. Process, Pool 비교)
멀티스레드와 멀티프로세스는 모두 데이터 병렬처리를 위한 python 모듈로 약간의 차이점과 용도별 차이가 있습니다. 스레드 vs 프로세스스레드하나의 프로세스 안에 여러개의 스레드 존재4가지
ninano1109.tistory.com
Pool의 장점으로는:
1. 미리 지정된 수의 프로세스 생성 후 프로세스 풀에서 작업 처리
2. 작업이 완료된 프로세스는 다시 풀에 반환되어 작업을 재실행하므로 프로세스 풀의 재사용
3. 배치 작업 시 유용하며 효율적인 자원 사용이 가능하며
4. 이 모든 프로세스를 자동으로 관리해줌이 있습니다.
def process_chunks(data, num_processes, chunk_size=3000):results = []with mp.Pool(processes=num_processes) as pool:for i in tqdm(range(0, len(data), chunk_size)):chunk = data[i:i + chunk_size]for result in pool.imap_unordered(work, chunk):results.append(result)pool.close()pool.join()
return resultsPool을 사용해 병렬 작업을 위한 함수를 만들어줍니다.
1. 먼저 병렬처리를 위해 데이터를 청크 사이즈 3000으로 나누어 전체 데이터를 3000개씩 쪼개어 작업을 병렬로 처리합니다.
2. mp.Pool로 멀티프로세싱을 위한 풀을 생성하고 num_processes 수 만큼 프로세스를 병렬로 생성하여 작업을 처리합니다.
3. 청크 사이즈로 쪼갠 데이터를 pool.imap_unordered를 사용하여 병렬로 work 함수를 실행 후 결과 값 results에 저장합니다.
* imap은 한 번에 모든 데이터 처리하지 않고, 필요한 시점에 결과 생성 => 대용량 데이터 처리 시 메모리 사용 최적화!
4. 더 이상 추가 작업이 남아 있지 않다면 pool을 종료하고, pool.join()으로 모든 프로세스가 끝날 때까지 기다립니다
def work(user_pair):user = user_pair[0]top_view_list = user_pair[1] #Top 5# 해당 user가 조회한 전체 itemview_all_list = view_all_dict[user]temp_list = []for item in top_view_list:try:recomm_item = cosine_sim_df.loc[int(item)].drop_duplicates().sort_values(ascending=False)[:12]recomm_item_list = list(zip(recomm_item.index, recomm_item)) #index = item, recomm_item = 유사도# 추천 대상 item 중 본 item, 추천 대상 리스트 중복, 유사도 1 제외final_recomm_list = [r for r in recomm_item_list if r[0] not in view_all_list and r[0] not in [t[0] for t in temp_list] and int(r[1]) != 1]temp_list.extend(final_recomm_list)
except Exception as e:pass
new_list = list(set(temp_list))new_list.sort(key=lambda x:x[1], reverse=True)final_list= new_list[:12]recomm_df = pd.DataFrame(final_list, columns = ['item', 'cos_sim'])recomm_df.insert(0, 'user', user)
return recomm_df다음으로 데이터 작업을 위한 work 함수를 생성합니다.
1. user가 가장 많이 조회한 item top 5와 조회한 전체 item 리스트를 저장합니다.
2. top_view_list의 아이템 각각에 대하여 코사인 유사도 데이터프레임에서 가장 유사한 12개의 추천 대상 아이템을 가져옵니다.
* (5X12= 총 60개의 추천 대상 아이템 추출하기)
3. 위 추천 대상 아이템 중에서 user가 이미 본 아이템이거나, 중복된 아이템이거나, 유사도가 1인(스펙이 똑같은) 아이템은
제외한 나머지를 최종 추천 아이템 리스트에 저장합니다.
4. 어느정도 추려진 추천 대상 중 유사도가 높은 순으로 정렬하여 최종적으로 12개의 item을 뽑아서
pandas 데이터프레임으로 반환하기
🍑 최종코드
from tqdm import tqdmimport multiprocessing as mpdef work(user_pair):user = user_pair[0]top_view_list = user_pair[1] #Top 5# 해당 user가 조회한 전체 itemview_all_list = view_all_dict[user]temp_list = []for item in top_view_list:try:recomm_item = cosine_sim_df.loc[int(item)].drop_duplicates().sort_values(ascending=False)[:12]recomm_item_list = list(zip(recomm_item.index, recomm_item)) #index = item, recomm_item = 유사도# 추천 대상 item 중 본 item, 추천 대상 리스트 중복, 유사도 1 제외final_recomm_list = [r for r in recomm_item_list if r[0] not in view_all_list and r[0] not in [t[0] for t in temp_list] and int(r[1]) != 1]temp_list.extend(final_recomm_list)
except Exception as e:pass
new_list = list(set(temp_list))new_list.sort(key=lambda x:x[1], reverse=True)final_list= new_list[:12]recomm_df = pd.DataFrame(final_list, columns = ['item', 'cos_sim'])recomm_df.insert(0, 'user', user)
return recomm_dfdef process_chunks(data, num_processes, chunk_size=3000):results = []with mp.Pool(processes=num_processes) as pool:for i in tqdm(range(0, len(data), chunk_size)):chunk = data[i:i + chunk_size]for result in pool.imap_unordered(work, chunk):results.append(result)pool.close()pool.join()
return resultsnum_processes = mp.cpu_count() #712 cores
results = process_chunks(user_recomm_list, num_processes)all_user = pd.concat(results, ignore_index=True)총 코어 수 712개와 청크 사이즈 3000으로 설정했을 때,
유저 수 16만개에 대한 병렬 처리 작업의 소요 시간을 9분 정도로 줄일 수 있었습니다!(기존: days+'α')
multiprocess.Pool을 활용한 병렬처리
당신이 어떤 것을
할머니에게 설명해주지 못한다면,
그것은 진정으로 이해한 것이 아니다.
- A.Einstein