개발/Python

[파이썬] 멀티스레딩은 모든 스레드가 완료될 때까지 기다립니다

MinorMan 2022. 10. 12. 07:20
반응형

<질문>

비슷한 맥락에서 물어본 것 같은데 20분 정도 검색해도 답이 안나와서 여쭤봅니다.

저는 Python 스크립트(예: scriptA.py)와 스크립트(예: scriptB.py)를 작성했습니다.

scriptB에서 다른 인수로 scriptA를 여러 번 호출하고 싶습니다. 매번 실행하는 데 약 1시간이 걸립니다. 다른 모든 인수를 동시에 사용하는 scriptA이지만 계속하기 전에 모든 인수가 완료될 때까지 기다려야 합니다. 내 코드:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

나는 모든 것을 실행하고 싶다subprocess.call() 동시에 모든 작업이 완료될 때까지 기다리면 어떻게 해야 합니까?

나는 예제와 같이 스레딩을 사용하려고했습니다.here:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

그러나 나는 이것이 옳다고 생각하지 않습니다.

내 팀에 가기 전에 그들이 모두 달리기를 마쳤는지 어떻게 알 수 있습니까?do_finish()?


<답변1>

스레드를 목록에 넣은 다음Join method

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()

<답변2>

당신은 사용할 필요가join 의 방법Thread 스크립트의 끝에 있는 개체.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

따라서 메인 스레드는t1,t2 그리고t3 실행을 마칩니다.


<답변3>

Python3에는 Python 3.2부터 동일한 결과에 도달하는 새로운 접근 방식이 있습니다. 개인적으로 전통적인 스레드 생성/시작/조인 패키지를 선호합니다.concurrent.futures:https://docs.python.org/3/library/concurrent.futures.html

사용ThreadPoolExecutor 코드는 다음과 같습니다.

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

이전 코드의 출력은 다음과 같습니다.

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

최대 동시 작업자를 설정하여 처리량을 제어할 수 있다는 장점이 있습니다.


<답변4>

나는 입력 목록을 기반으로 목록 이해를 사용하는 것을 선호합니다.

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

<답변5>

병렬로 실행하려는 'n'개의 함수 또는 console_scripts를 추가하고 실행을 시작하고 모든 작업이 완료될 때까지 기다릴 수 있는 아래와 같은 클래스를 가질 수 있습니다.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()

<답변6>

방금 for 루프를 사용하여 생성된 모든 스레드를 기다려야 하는 동일한 문제가 발생했습니다. 방금 다음 코드를 시도했습니다. 완벽한 솔루션은 아니지만 간단한 솔루션일 것이라고 생각했습니다. 테스트:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise

<답변7>

로부터threading module documentation

"메인 스레드" 개체가 있습니다. 이것은 초기에 해당합니다 Python 프로그램의 제어 스레드. 데몬 스레드가 아닙니다. "더미 스레드 개체"가 생성될 가능성이 있습니다. 이들은 "외계 스레드"에 해당하는 스레드 개체입니다. 다음과 같은 스레딩 모듈 외부에서 시작된 제어 스레드 C 코드에서 직접. 더미 스레드 개체는 기능이 제한되어 있습니다. 그들은 항상 살아 있고 데몬으로 간주되며 조인()할 수 없습니다. 탐지가 불가능하기 때문에 절대 삭제되지 않습니다. 외계인 스레드의 종료.

따라서 생성한 스레드 목록을 유지하는 데 관심이 없는 두 가지 경우를 파악하려면 다음을 수행하십시오.

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

그래서:

>>> print(data)
[0, 4, 12, 40]

<답변8>

어쩌면, 뭔가

for t in threading.enumerate():
    if t.daemon:
        t.join()

<답변9>

조인만 사용하면가양성 스레드와의 상호 작용. 문서에서 말했듯이 :

timeout 인수가 있고 None이 아닌 경우 작업 시간 초과를 지정하는 부동 소수점 숫자 초(또는 그 분수). join()은 항상 None을 반환하므로, 시간 초과가 발생했는지 여부를 결정하려면 join() 후에 isAlive()를 호출해야 합니다. – 스레드가 아직 활성 상태이면 join() 호출 시간이 초과되었습니다.

및 예시 코드:

threads = []
for name in some_data:
    new = threading.Thread(
        target=self.some_func,
        args=(name,)
    )
    threads.append(new)
    new.start()
    
over_threads = iter(threads)
curr_th = next(over_threads)
while True:
    curr_th.join()
    if curr_th.is_alive():
        continue
    try:
        curr_th = next(over_threads)
    except StopIteration:
        break
반응형