에라모르겠다(‘◇’)?

[Python] 멀티프로세싱 multiprocessing 본문

Language/Python

[Python] 멀티프로세싱 multiprocessing

도토리즈 2023. 6. 29. 23:34

업무 중 리스트를 순차적으로 돌면서 처리해야 할 데이터의 양이 많아서 

엄청나게 오랜 시간의 속도가 걸리는 작업을 했던 경험이 있었다..ㅠ

그래서 멀티프로세싱을 사용하여 병렬 작업 처리를 하니까 작업 속도가 훨씬 빨라져 제대로 기억하기 위해 정리 ! 

 

 

* 멀티프로세싱이란 ? 

멀티프로세싱은 하나 이상의 프로세스를 동시에 실행하여 작업을 병렬로 처리하는 기술.

각 프로세스는 독립적으로 실행되며, 각각의 프로세스는 다른 메모리 공간을 가지고 있음.

 

* 멀티프로세싱이 유용하게 쓰이는 상황 

- CPU 집약적인 작업

: CPU를 많이 사용하는 작업(예: 계산 작업, 데이터 처리 등)

= > 멀티프로세싱으로 처리하면 성능 향상을 기대 할 수 있음 ,   프로세스가 별도의 CPU 코어를 사용하기 때문에 작업 병렬 처리 가능

- 동시성 처리

: 여러 개의 작업을 동시에 처리해야 하는 경우 , 각 작업을 별도의 프로세스로 실행하면 작업들이 서로 영향을 주지 않고 병렬로 처리될 수 있음ex)  웹 서버에서 동시에 여러 요청을 처리, 데이터베이스 작업을 병렬로 수행하는 경우 등

- 대규모 데이터 처리

: 많은 양의 데이터를 처리해야 할 때 멀티프로세싱을 사용하면 작업 시간을 단축시킬 수 있음 , 각 프로세스는 독립적으로 데이터를 처리하므로, 병렬로 처리하면 효율적으로 대용량 데이터를 처리할 수 있다

 

* 사용 함수

①  apply()

apply() 함수는 프로세스 풀의 각 프로세스에 대해 함수를 실행하고 결과를 반환.

각 프로세스는 pool에게 한 번에 하나의 작업만 시키며 작업이 끝날 때 까지 기다렸다가 처리 결과를 받음

 

from django.shortcuts import render
from rest_framework.decorators import api_view
from rest_framework.response import Response 
from .models import *
from django.http import JsonResponse
from multiprocessing import Pool
import multiprocessing as mp
import time

# 멀티프로세스 예제에서 사용할 인자 x값에 대한 제곱을 실행하는 메소드 
def square(x):
    return x * x

# 1. apply() 사용 example
@api_view(['post'])
def multi_apply_view(request) :
    start = time.time()
    with Pool(processes=4) as pool:
        result = pool.apply(square, (5,))
        print("result = ", result)
        result = pool.apply(square, (6,))
        print("result = " ,result)
        result = pool.apply(square, (10,))
        print("result = ",result)
    end = time.time()

    print("총 소요시간 : " , end - start)
    return JsonResponse(result, safe=False)

 

print로 출력한 result 결과를 확인해 보면 하나의 작업이 끝난 후 다음 작업이 수행됨을 알 수 있음 ! 

 

② map()

map() 함수는 프로세스 풀의 각 프로세스에 대해 함수를 실행하고 결과를 반환. 각 프로세스는 여러 개의 작업을 처리할 수 있다.

# 멀티프로세스 예제에서 사용할 인자 x값에 대한 제곱을 실행하는 메소드 
def square(x):
    return x * x

# 2. map() 사용 example
@api_view(['post'])
def multi_map_view(request) :
    number_list = []
    for i in range(1,101) :
        number_list.append(i) 
    start = time.time()
    with Pool(processes=4) as pool:
        numbers = number_list
        results = pool.map(square, numbers)
        print("result =", results)   
    end = time.time()

    print("총 소요시간 : " , end - start)
    return JsonResponse(results, safe=False)

 

print로 출력한 results 결과를 확인해 보면

동시에 작업이 실행되기 때문 더 많은 양의 작업을 처리하지만 소요시간이 ①번보다 더 짧게 나타남 !

③ imap()

imap() 함수는 map()과 유사하지만, 결과를 반환하는 대신 제너레이터(generator)를 반환함 ->  대량의 결과를 처리할 때 유용

작업이 완료될 때마다 하나의 결과를 반환하며, 순차적으로 결과 가져올 수 있다. 

 

# 멀티프로세스 예제에서 사용할 인자 x값에 대한 제곱을 실행하는 메소드 
def square(x):
    return x * x

# 3. imap() 사용 example
@api_view(['post'])
def multi_imap_view(request) :
    number_list = []
    for i in range(1,101) :
        number_list.append(i) 
    start = time.time()
    with Pool(processes=4) as pool:
        numbers = number_list
        results = pool.imap(square, numbers)
        results_list = list(results)

        for result in results_list : 
            print("result =", result)

    end = time.time()

    print("총 소요시간 : " , end - start)
    return JsonResponse(results_list, safe=False)

print로 출력한 result 결과를 확인해 보면 for문을 돌면서 하나의 제너레이터 객체를 순차적으로 가져옴

 

* example 

# 멀티프로세스 예제에서 사용할 인자 x값에 대한 제곱을 실행하는 메소드 
def square(x):
    return x * x

# 멀티프로세싱 example
def square_wrapper(chunk):
    return [square(num) for num in chunk]

@api_view(['POST'])
def example_multi_view(request):
    number_list = []
    for i in range(1, 101):
        number_list.append(i)

    numbers = number_list
    num_processes = 4
    chunk_size = len(numbers) // num_processes
    chunks = []
    for i in range(0, len(numbers), chunk_size):
        chunk = numbers[i:i+chunk_size]
        chunks.append(chunk)
  

    start = time.time()

    with Pool(processes=num_processes) as pool:
        results = pool.map(square_wrapper, chunks)
        pool.close()
        pool.join()

    final_result = []
    print("results=", results)
    for sublist in results:
        print("sublist=",sublist)
        for result in sublist:
            final_result.append(result)
    
    end = time.time()
    
    print("총 소요시간 =", end - start)

    return JsonResponse(final_result, safe=False)

-- 설명 

num_processes [int]: 병렬로 실행할 프로세스의 개수를 나타냄 (4개 설정)

chunk_size [int]:  리스트를 작은 작업 단위로 분할하기 위한 기준값. 입력 리스트의 길이를 프로세스 개수로 나눈 값으로 설정.

이를 통해 입력 리스트를 작은 작업 단위로 분할하여 각 프로세스가 독립적으로 처리 가능 .

chunks [list] : 입력 리스트를 작은 작업 단위로 분할한 결과

range() 함수와 슬라이싱을 사용하여 작은 작업 단위로 리스트를 분할하고, 이를 chunks 리스트에 저장

Pool() : 프로세스 풀을 생성. 이를 사용하여 병렬로 작업을 실행할 수 있음

processes 매개변수를 통해 생성할 프로세스의 개수를 지정

pool.map() : 함수는 작업을 병렬로 실행하고, 각 작업의 결과를 수집. 여기서는 square 함수를 chunks 리스트의 각 요소에 적용하여 작업을 실행하고, 결과를 results 리스트에 저장

pool.close() :  프로세스 풀을 닫음. 더 이상 새로운 작업을 받지 않고, 기존에 예약된 작업들이 완료될 때까지 대기

pool.join() :  모든 프로세스의 실행이 완료될 때까지 대기. 모든 작업이 끝나면 다음 코드로 넘어감

 

results [list] :  작은 작업들의 결과를 순서대로 모아놓은 전체 결과

 

sublist [list] : 각각의 작은 작업에서 계산된 결과를 담고 있는 list

 

final_result [list]: results 리스트의 각 작은 리스트를 병합한 결과

 

print()로 results와 sublist를 출력해본 결과 아래와 같은 결과로 나타나는 것을 볼 수있음 

 

Comments