개발/Python

Python에서 병렬 프로그래밍을 수행하는 방법은 무엇입니까?

MinorMan 2021. 2. 7. 11:54
반응형

<질문>

C ++의 경우 OpenMP를 사용하여 병렬 프로그래밍을 수행 할 수 있습니다. 그러나 OpenMP는 Python에서 작동하지 않습니다. 파이썬 프로그램의 일부를 병렬화하려면 어떻게해야합니까?

코드의 구조는 다음과 같이 간주 될 수 있습니다.

solve1(A)
solve2(B)

어디solve1solve2두 개의 독립적 인 기능입니다. 실행 시간을 줄이기 위해 이런 종류의 코드를 순서대로 대신 병렬로 실행하는 방법은 무엇입니까? 코드는 다음과 같습니다.

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

어디setinnersetouter두 개의 독립적 인 기능입니다. 그것이 내가 평행을 이루고 싶은 곳입니다 ...


<답변1>

당신은 사용할 수 있습니다multiprocessing기준 치수. 이 경우 처리 풀을 사용할 수 있습니다.

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

이렇게하면 일반적인 작업을 수행 할 수있는 프로세스가 생성됩니다. 우리가 통과하지 않았기 때문에processes, 컴퓨터의 각 CPU 코어에 대해 하나의 프로세스를 생성합니다. 각 CPU 코어는 하나의 프로세스를 동시에 실행할 수 있습니다.

목록을 단일 함수에 매핑하려면 다음을 수행합니다.

args = [A, B]
results = pool.map(solve1, args)

스레드를 사용하지 마십시오.GIL파이썬 객체에 대한 모든 작업을 잠급니다.


<답변2>

이것은 매우 우아하게 할 수 있습니다.Ray.

예제를 병렬화하려면 다음을 사용하여 함수를 정의해야합니다.@ray.remote데코레이터를 사용하여 호출합니다..remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

이것에 비해 많은 장점이 있습니다.multiprocessing기준 치수.

  1. 동일한 코드가 멀티 코어 머신과 머신 클러스터에서 실행됩니다.
  2. 프로세스는 다음을 통해 효율적으로 데이터를 공유합니다.shared memory and zero-copy serialization.
  3. 오류 메시지는 멋지게 전파됩니다.
  4. 이러한 함수 호출은 함께 구성 될 수 있습니다. 예 :

    @ray.remote def f(x): return x + 1 x_id = f.remote(1) y_id = f.remote(x_id) z_id = f.remote(y_id) ray.get(z_id) # returns 4
  5. 원격으로 함수를 호출하는 것 외에도 클래스를 다음과 같이 원격으로 인스턴스화 할 수 있습니다.actors.

참고Ray제가 개발을 돕고있는 프레임 워크입니다.


<답변3>

CPython은 Global Interpreter Lock을 사용하여 병렬 프로그래밍을 C ++보다 조금 더 흥미롭게 만듭니다.

이 항목에는 문제에 대한 몇 가지 유용한 예와 설명이 있습니다.

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?


<답변4>

다른 사람들이 말했듯이 해결책은 여러 프로세스를 사용하는 것입니다. 그러나 어떤 프레임 워크가 더 적합한지는 여러 요인에 따라 다릅니다. 이미 언급 한 것 외에도charm4pympi4py(나는 charm4py의 개발자입니다).

작업자 풀 추상화를 사용하는 것보다 위의 예를 구현하는 더 효율적인 방법이 있습니다. 메인 루프는 동일한 매개 변수 (전체 그래프 포함)를 전송합니다.G) 각 1000 번의 반복에서 작업자에게 계속 반복됩니다. 적어도 한 명의 작업자가 다른 프로세스에 상주하므로 인수를 복사하여 다른 프로세스로 보내는 작업이 포함됩니다. 이것은 물체의 크기에 따라 매우 비쌀 수 있습니다. 대신 작업자가 상태를 저장하고 업데이트 된 정보를 전송하도록하는 것이 합리적입니다.

예를 들어 charm4py에서 다음과 같이 할 수 있습니다.

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

이 예에서는 실제로 작업자가 하나만 필요합니다. 메인 루프는 기능 중 하나를 실행하고 작업자가 다른 기능을 실행하도록 할 수 있습니다. 하지만 내 코드는 몇 가지를 설명하는 데 도움이됩니다.

  1. 작업자 A는 프로세스 0에서 실행됩니다 (주 루프와 동일). 동안result_a.get()결과를 기다리는 것이 차단되고 작업자 A는 동일한 프로세스에서 계산을 수행합니다.
  2. 인수는 동일한 프로세스에 있으므로 작업자 A에 대한 참조를 통해 자동으로 전달됩니다 (복사가 관련되지 않음).

<답변5>

어떤 경우에는 다음을 사용하여 루프를 자동으로 병렬화 할 수 있습니다.Numba, Python의 작은 하위 집합에서만 작동합니다.

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

불행히도 Numba는 Numpy 배열에서만 작동하지만 다른 Python 객체에서는 작동하지 않는 것 같습니다. 이론적으로는compile Python to C++그리고automatically parallelize it using the Intel C++ compiler, 아직 시도하지 않았지만.


<답변6>

당신이 사용할 수있는joblib병렬 계산 및 다중 처리를 수행하는 라이브러리.

from joblib import Parallel, delayed

간단히 함수를 만들 수 있습니다.foo병렬로 실행하려는 다음 코드를 기반으로 병렬 처리를 구현합니다.

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

어디num_cores에서 얻을 수 있습니다multiprocessing다음과 같이 라이브러리 :

import multiprocessing

num_cores = multiprocessing.cpu_count()

둘 이상의 입력 인수가있는 함수가 있고 목록으로 인수 중 하나를 반복하려는 경우 다음을 사용할 수 있습니다.partial기능functools다음과 같이 라이브러리 :

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

몇 가지 예제를 통해 Python 및 R 다중 처리에 대한 완전한 설명을 찾을 수 있습니다.here.

반응형