개발/Python

[파이썬] PyQt에서 QThread가있는 백그라운드 스레드

MinorMan 2021. 1. 10. 23:32
반응형

<질문>

PyQt에서 작성한 GUI를 통해 사용중인 라디오와 인터페이스하는 프로그램이 있습니다. 분명히 라디오의 주요 기능 중 하나는 데이터를 전송하는 것이지만이를 계속하려면 쓰기를 반복해야하므로 GUI가 중단됩니다. 스레딩을 한 번도 다루지 않았기 때문에 나는QCoreApplication.processEvents().그러나 라디오는 전송 사이에 절전 모드가 필요하므로 GUI는 여전히 이러한 절전 시간에 따라 정지됩니다.

QThread를 사용하여 이것을 고치는 간단한 방법이 있습니까? PyQt로 멀티 스레딩을 구현하는 방법에 대한 자습서를 찾았지만 대부분은 서버 설정을 다루며 필요한 것보다 훨씬 더 고급입니다. 나는 솔직히 그것이 실행되는 동안 아무것도 업데이트하기 위해 내 스레드가 실제로 필요하지도 않습니다. 그저 시작하고 백그라운드에서 전송하고 중지하면됩니다.


<답변1>

나는 쓰레드를 다루는 3 가지 다른 간단한 방법을 보여주는 작은 예제를 만들었습니다. 문제에 대한 올바른 접근 방식을 찾는 데 도움이되기를 바랍니다.

import sys
import time

from PyQt5.QtCore import (QCoreApplication, QObject, QRunnable, QThread,
                          QThreadPool, pyqtSignal)


# Subclassing QThread
# http://qt-project.org/doc/latest/qthread.html
class AThread(QThread):

    def run(self):
        count = 0
        while count < 5:
            time.sleep(1)
            print("A Increasing")
            count += 1

# Subclassing QObject and using moveToThread
# http://blog.qt.digia.com/blog/2007/07/05/qthreads-no-longer-abstract
class SomeObject(QObject):

    finished = pyqtSignal()

    def long_running(self):
        count = 0
        while count < 5:
            time.sleep(1)
            print("B Increasing")
            count += 1
        self.finished.emit()

# Using a QRunnable
# http://qt-project.org/doc/latest/qthreadpool.html
# Note that a QRunnable isn't a subclass of QObject and therefore does
# not provide signals and slots.
class Runnable(QRunnable):

    def run(self):
        count = 0
        app = QCoreApplication.instance()
        while count < 5:
            print("C Increasing")
            time.sleep(1)
            count += 1
        app.quit()


def using_q_thread():
    app = QCoreApplication([])
    thread = AThread()
    thread.finished.connect(app.exit)
    thread.start()
    sys.exit(app.exec_())

def using_move_to_thread():
    app = QCoreApplication([])
    objThread = QThread()
    obj = SomeObject()
    obj.moveToThread(objThread)
    obj.finished.connect(objThread.quit)
    objThread.started.connect(obj.long_running)
    objThread.finished.connect(app.exit)
    objThread.start()
    sys.exit(app.exec_())

def using_q_runnable():
    app = QCoreApplication([])
    runnable = Runnable()
    QThreadPool.globalInstance().start(runnable)
    sys.exit(app.exec_())

if __name__ == "__main__":
    #using_q_thread()
    #using_move_to_thread()
    using_q_runnable()

<답변2>

이 답변을 PyQt5, python 3.4에 대해 업데이트하십시오.

이를 패턴으로 사용하여 데이터를 사용하지 않고 양식에 사용할 수있는 데이터를 반환하는 작업자를 시작합니다.

1-작업자 클래스를 더 작게 만들고 자체 파일 worker.py에 넣어 쉽게 암기하고 독립적 인 소프트웨어를 재사용합니다.

2-main.py 파일은 GUI Form 클래스를 정의하는 파일입니다.

3-스레드 개체가 하위 클래스가 아닙니다.

4-스레드 개체와 작업자 개체가 모두 Form 개체에 속합니다.

5-절차의 단계는 설명 내에 있습니다.

# worker.py
from PyQt5.QtCore import QThread, QObject, pyqtSignal, pyqtSlot
import time


class Worker(QObject):
    finished = pyqtSignal()
    intReady = pyqtSignal(int)


    @pyqtSlot()
    def procCounter(self): # A slot takes no params
        for i in range(1, 100):
            time.sleep(1)
            self.intReady.emit(i)

        self.finished.emit()

주요 파일은 다음과 같습니다.

  # main.py
  from PyQt5.QtCore import QThread
  from PyQt5.QtWidgets import QApplication, QLabel, QWidget, QGridLayout
  import sys
  import worker


  class Form(QWidget):

    def __init__(self):
       super().__init__()
       self.label = QLabel("0")

       # 1 - create Worker and Thread inside the Form
       self.obj = worker.Worker()  # no parent!
       self.thread = QThread()  # no parent!

       # 2 - Connect Worker`s Signals to Form method slots to post data.
       self.obj.intReady.connect(self.onIntReady)

       # 3 - Move the Worker object to the Thread object
       self.obj.moveToThread(self.thread)

       # 4 - Connect Worker Signals to the Thread slots
       self.obj.finished.connect(self.thread.quit)

       # 5 - Connect Thread started signal to Worker operational slot method
       self.thread.started.connect(self.obj.procCounter)

       # * - Thread finished signal will close the app if you want!
       #self.thread.finished.connect(app.exit)

       # 6 - Start the thread
       self.thread.start()

       # 7 - Start the form
       self.initUI()


    def initUI(self):
        grid = QGridLayout()
        self.setLayout(grid)
        grid.addWidget(self.label,0,0)

        self.move(300, 150)
        self.setWindowTitle('thread test')
        self.show()

    def onIntReady(self, i):
        self.label.setText("{}".format(i))
        #print(i)

    app = QApplication(sys.argv)

    form = Form()

    sys.exit(app.exec_())

<답변3>

Matt의 아주 좋은 예, 오타를 고쳤고 pyqt4.8도 이제 일반적이므로 더미 클래스도 제거하고 dataReady 신호에 대한 예제를 추가했습니다.

# -*- coding: utf-8 -*-
import sys
from PyQt4 import QtCore, QtGui
from PyQt4.QtCore import Qt


# very testable class (hint: you can use mock.Mock for the signals)
class Worker(QtCore.QObject):
    finished = QtCore.pyqtSignal()
    dataReady = QtCore.pyqtSignal(list, dict)

    @QtCore.pyqtSlot()
    def processA(self):
        print "Worker.processA()"
        self.finished.emit()

    @QtCore.pyqtSlot(str, list, list)
    def processB(self, foo, bar=None, baz=None):
        print "Worker.processB()"
        for thing in bar:
            # lots of processing...
            self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']})
        self.finished.emit()


def onDataReady(aList, aDict):
    print 'onDataReady'
    print repr(aList)
    print repr(aDict)


app = QtGui.QApplication(sys.argv)

thread = QtCore.QThread()  # no parent!
obj = Worker()  # no parent!
obj.dataReady.connect(onDataReady)

obj.moveToThread(thread)

# if you want the thread to stop after the worker is done
# you can always call thread.start() again later
obj.finished.connect(thread.quit)

# one way to do it is to start processing as soon as the thread starts
# this is okay in some cases... but makes it harder to send data to
# the worker object from the main gui thread.  As you can see I'm calling
# processA() which takes no arguments
thread.started.connect(obj.processA)
thread.finished.connect(app.exit)

thread.start()

# another way to do it, which is a bit fancier, allows you to talk back and
# forth with the object in a thread safe way by communicating through signals
# and slots (now that the thread is running I can start calling methods on
# the worker object)
QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection,
                                QtCore.Q_ARG(str, "Hello World!"),
                                QtCore.Q_ARG(list, ["args", 0, 1]),
                                QtCore.Q_ARG(list, []))

# that looks a bit scary, but its a totally ok thing to do in Qt,
# we're simply using the system that Signals and Slots are built on top of,
# the QMetaObject, to make it act like we safely emitted a signal for
# the worker thread to pick up when its event loop resumes (so if its doing
# a bunch of work you can call this method 10 times and it will just queue
# up the calls.  Note: PyQt > 4.6 will not allow you to pass in a None
# instead of an empty list, it has stricter type checking

app.exec_()

<답변4>

Qt 개발자에 따르면 QThread 하위 클래스 지정이 잘못되었습니다 (http://blog.qt.io/blog/2010/06/17/youre-doing-it-wrong/). 그러나 그 기사는 이해하기가 정말 어렵습니다 (제목이 약간 낮아집니다). 한 스타일의 스레딩을 다른 스타일보다 사용해야하는 이유에 대한 자세한 설명을 제공하는 더 나은 블로그 게시물을 찾았습니다.http://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/

제 생각에는 run 메서드를 오버로드하려는 의도로 스레드를 하위 클래스 화해서는 안됩니다. 작동하는 동안 기본적으로 Qt가 원하는 방식을 우회하는 것입니다. 또한 이벤트 및 적절한 스레드 안전 신호 및 슬롯과 같은 것을 놓칠 수 있습니다. 또한 위의 블로그 게시물에서 볼 수 있듯이 "올바른"스레딩 방법을 사용하면 더 테스트 가능한 코드를 작성해야합니다.

다음은 PyQt에서 QThreads를 활용하는 방법에 대한 몇 가지 예입니다 (QRunnable을 적절하게 사용하고 신호 / 슬롯을 통합하는 별도의 답변을 아래에 게시했습니다.로드 균형 조정이 필요한 비동기 작업이 많은 경우 그 대답이 더 좋습니다) .

import sys
from PyQt4 import QtCore
from PyQt4 import QtGui
from PyQt4.QtCore import Qt

# very testable class (hint: you can use mock.Mock for the signals)
class Worker(QtCore.QObject):
    finished = QtCore.pyqtSignal()
    dataReady = QtCore.pyqtSignal(list, dict)

    @QtCore.pyqtSlot()
    def processA(self):
        print "Worker.processA()"
        self.finished.emit()

    @QtCore.pyqtSlot(str, list, list)
    def processB(self, foo, bar=None, baz=None):
        print "Worker.processB()"
        for thing in bar:
            # lots of processing...
            self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']})
        self.finished.emit()


class Thread(QtCore.QThread):
    """Need for PyQt4 <= 4.6 only"""
    def __init__(self, parent=None):
        QtCore.QThread.__init__(self, parent)

     # this class is solely needed for these two methods, there
     # appears to be a bug in PyQt 4.6 that requires you to
     # explicitly call run and start from the subclass in order
     # to get the thread to actually start an event loop

    def start(self):
        QtCore.QThread.start(self)

    def run(self):
        QtCore.QThread.run(self)


app = QtGui.QApplication(sys.argv)

thread = Thread() # no parent!
obj = Worker() # no parent!
obj.moveToThread(thread)

# if you want the thread to stop after the worker is done
# you can always call thread.start() again later
obj.finished.connect(thread.quit)

# one way to do it is to start processing as soon as the thread starts
# this is okay in some cases... but makes it harder to send data to
# the worker object from the main gui thread.  As you can see I'm calling
# processA() which takes no arguments
thread.started.connect(obj.processA)
thread.start()

# another way to do it, which is a bit fancier, allows you to talk back and
# forth with the object in a thread safe way by communicating through signals
# and slots (now that the thread is running I can start calling methods on
# the worker object)
QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection,
                                QtCore.Q_ARG(str, "Hello World!"),
                                QtCore.Q_ARG(list, ["args", 0, 1]),
                                QtCore.Q_ARG(list, []))

# that looks a bit scary, but its a totally ok thing to do in Qt,
# we're simply using the system that Signals and Slots are built on top of,
# the QMetaObject, to make it act like we safely emitted a signal for 
# the worker thread to pick up when its event loop resumes (so if its doing
# a bunch of work you can call this method 10 times and it will just queue
# up the calls.  Note: PyQt > 4.6 will not allow you to pass in a None
# instead of an empty list, it has stricter type checking

app.exec_()

# Without this you may get weird QThread messages in the shell on exit
app.deleteLater()        

<답변5>

PyQt에는 비동기 동작을 얻기위한 많은 옵션이 있습니다. 이벤트 처리 (예 : QtNetwork 등)가 필요한 경우이 스레드에 대한 다른 답변에서 제공 한 QThread 예제를 사용해야합니다. 그러나 대부분의 스레딩 요구 사항에 대해이 솔루션이 다른 방법보다 훨씬 우수하다고 생각합니다.

이것의 장점은 QThreadPool이 QRunnable 인스턴스를 작업으로 예약한다는 것입니다. 이것은 Intel의 TBB에서 사용되는 작업 패턴과 유사합니다. 내가 좋아하는 것만 큼 우아하지는 않지만 뛰어난 비동기 동작을 제공합니다.

이를 통해 QRunnable을 통해 Python에서 Qt의 대부분의 스레딩 능력을 활용하면서도 신호와 슬롯을 계속 활용할 수 있습니다. 여러 애플리케이션에서이 동일한 코드를 사용합니다. 일부는 수백 개의 비동기 REST 호출을 수행하고 일부는 파일을 열거 나 디렉토리를 나열하며 가장 좋은 부분은이 방법을 사용하는 것입니다. Qt 작업은 시스템 리소스의 균형을 조정합니다.

import time
from PyQt4 import QtCore
from PyQt4 import QtGui
from PyQt4.QtCore import Qt


def async(method, args, uid, readycb, errorcb=None):
    """
    Asynchronously runs a task

    :param func method: the method to run in a thread
    :param object uid: a unique identifier for this task (used for verification)
    :param slot updatecb: the callback when data is receieved cb(uid, data)
    :param slot errorcb: the callback when there is an error cb(uid, errmsg)

    The uid option is useful when the calling code makes multiple async calls
    and the callbacks need some context about what was sent to the async method.
    For example, if you use this method to thread a long running database call
    and the user decides they want to cancel it and start a different one, the
    first one may complete before you have a chance to cancel the task.  In that
    case, the "readycb" will be called with the cancelled task's data.  The uid
    can be used to differentiate those two calls (ie. using the sql query).

    :returns: Request instance
    """
    request = Request(method, args, uid, readycb, errorcb)
    QtCore.QThreadPool.globalInstance().start(request)
    return request


class Request(QtCore.QRunnable):
    """
    A Qt object that represents an asynchronous task

    :param func method: the method to call
    :param list args: list of arguments to pass to method
    :param object uid: a unique identifier (used for verification)
    :param slot readycb: the callback used when data is receieved
    :param slot errorcb: the callback used when there is an error

    The uid param is sent to your error and update callbacks as the
    first argument. It's there to verify the data you're returning

    After created it should be used by invoking:

    .. code-block:: python

       task = Request(...)
       QtCore.QThreadPool.globalInstance().start(task)

    """
    INSTANCES = []
    FINISHED = []
    def __init__(self, method, args, uid, readycb, errorcb=None):
        super(Request, self).__init__()
        self.setAutoDelete(True)
        self.cancelled = False

        self.method = method
        self.args = args
        self.uid = uid
        self.dataReady = readycb
        self.dataError = errorcb

        Request.INSTANCES.append(self)

        # release all of the finished tasks
        Request.FINISHED = []

    def run(self):
        """
        Method automatically called by Qt when the runnable is ready to run.
        This will run in a separate thread.
        """
        # this allows us to "cancel" queued tasks if needed, should be done
        # on shutdown to prevent the app from hanging
        if self.cancelled:
            self.cleanup()
            return

        # runs in a separate thread, for proper async signal/slot behavior
        # the object that emits the signals must be created in this thread.
        # Its not possible to run grabber.moveToThread(QThread.currentThread())
        # so to get this QObject to properly exhibit asynchronous
        # signal and slot behavior it needs to live in the thread that
        # we're running in, creating the object from within this thread
        # is an easy way to do that.
        grabber = Requester()
        grabber.Loaded.connect(self.dataReady, Qt.QueuedConnection)
        if self.dataError is not None:
            grabber.Error.connect(self.dataError, Qt.QueuedConnection)

        try:
            result = self.method(*self.args)
            if self.cancelled:
                # cleanup happens in 'finally' statement
                return
            grabber.Loaded.emit(self.uid, result)
        except Exception as error:
            if self.cancelled:
                # cleanup happens in 'finally' statement
                return
            grabber.Error.emit(self.uid, unicode(error))
        finally:
            # this will run even if one of the above return statements
            # is executed inside of the try/except statement see:
            # https://docs.python.org/2.7/tutorial/errors.html#defining-clean-up-actions
            self.cleanup(grabber)

    def cleanup(self, grabber=None):
        # remove references to any object or method for proper ref counting
        self.method = None
        self.args = None
        self.uid = None
        self.dataReady = None
        self.dataError = None

        if grabber is not None:
            grabber.deleteLater()

        # make sure this python obj gets cleaned up
        self.remove()

    def remove(self):
        try:
            Request.INSTANCES.remove(self)

            # when the next request is created, it will clean this one up
            # this will help us avoid this object being cleaned up
            # when it's still being used
            Request.FINISHED.append(self)
        except ValueError:
            # there might be a race condition on shutdown, when shutdown()
            # is called while the thread is still running and the instance
            # has already been removed from the list
            return

    @staticmethod
    def shutdown():
        for inst in Request.INSTANCES:
            inst.cancelled = True
        Request.INSTANCES = []
        Request.FINISHED = []


class Requester(QtCore.QObject):
    """
    A simple object designed to be used in a separate thread to allow
    for asynchronous data fetching
    """

    #
    # Signals
    #

    Error = QtCore.pyqtSignal(object, unicode)
    """
    Emitted if the fetch fails for any reason

    :param unicode uid: an id to identify this request
    :param unicode error: the error message
    """

    Loaded = QtCore.pyqtSignal(object, object)
    """
    Emitted whenever data comes back successfully

    :param unicode uid: an id to identify this request
    :param list data: the json list returned from the GET
    """

    NetworkConnectionError = QtCore.pyqtSignal(unicode)
    """
    Emitted when the task fails due to a network connection error

    :param unicode message: network connection error message
    """

    def __init__(self, parent=None):
        super(Requester, self).__init__(parent)


class ExampleObject(QtCore.QObject):
    def __init__(self, parent=None):
        super(ExampleObject, self).__init__(parent)
        self.uid = 0
        self.request = None

    def ready_callback(self, uid, result):
        if uid != self.uid:
            return
        print "Data ready from %s: %s" % (uid, result)

    def error_callback(self, uid, error):
        if uid != self.uid:
            return
        print "Data error from %s: %s" % (uid, error)

    def fetch(self):
        if self.request is not None:
            # cancel any pending requests
            self.request.cancelled = True
            self.request = None

        self.uid += 1
        self.request = async(slow_method, ["arg1", "arg2"], self.uid,
                             self.ready_callback,
                             self.error_callback)


def slow_method(arg1, arg2):
    print "Starting slow method"
    time.sleep(1)
    return arg1 + arg2


if __name__ == "__main__":
    import sys
    app = QtGui.QApplication(sys.argv)

    obj = ExampleObject()

    dialog = QtGui.QDialog()
    layout = QtGui.QVBoxLayout(dialog)
    button = QtGui.QPushButton("Generate", dialog)
    progress = QtGui.QProgressBar(dialog)
    progress.setRange(0, 0)
    layout.addWidget(button)
    layout.addWidget(progress)
    button.clicked.connect(obj.fetch)
    dialog.show()

    app.exec_()
    app.deleteLater() # avoids some QThread messages in the shell on exit
    # cancel all running tasks avoid QThread/QTimer error messages
    # on exit
    Request.shutdown()

응용 프로그램을 종료 할 때 모든 작업을 취소해야합니다. 그렇지 않으면 예약 된 모든 작업이 완료 될 때까지 응용 프로그램이 중단됩니다.


<답변6>

다른 답변에서 언급 한 Worker 개체 메서드를 기반으로 더 많은 스레드를 호출하기 위해 솔루션을 확장 할 수 있는지 확인하기로 결정했습니다.이 경우 기계가 실행되고 불확실한 완료 시간으로 여러 작업자를 회전시킬 수있는 최적의 수입니다. 이렇게하려면 QThread를 하위 클래스로 만들어야하지만 스레드 번호를 할당하고 스레드 번호를 포함하도록 '완료'및 '시작됨'신호를 '다시 구현'하기 만하면됩니다.

나는 메인 GUI, 쓰레드, 워커 사이의 신호에 상당히 집중했습니다.

마찬가지로 다른 답변은 QThread를 양육하지 않는다는 점을 지적하는 데 어려움을 겪었지만 이것이 진정한 관심사라고 생각하지 않습니다. 그러나 내 코드는 QThread 객체를 파괴하는 데에도주의를 기울입니다.

그러나 작업자 개체를 부모로 지정할 수 없으므로 스레드 함수가 완료되거나 GUI가 파괴 될 때 deleteLater () 신호를 보내는 것이 바람직합니다. 나는 이것을하지 않기 위해 내 자신의 코드를 걸었습니다.

내가 필요하다고 느꼈던 또 다른 개선 사항은 GUI (QWidget)의 closeEvent를 다시 구현하여 스레드를 종료하도록 지시 한 다음 GUI가 모든 스레드가 완료 될 때까지 대기하는 것입니다. 이 질문에 대한 다른 답변을 가지고 놀 때 QThread 파괴 오류가 발생했습니다.

아마도 그것은 다른 사람들에게 유용 할 것입니다. 나는 그것이 유용한 운동임을 확실히 알았다. 아마도 다른 사람들은 스레드가 자신의 정체성을 알리는 더 나은 방법을 알게 될 것입니다.

#!/usr/bin/env python3
#coding:utf-8
# Author:   --<>
# Purpose:  To demonstrate creation of multiple threads and identify the receipt of thread results
# Created: 19/12/15

import sys


from PyQt4.QtCore import QThread, pyqtSlot, pyqtSignal
from PyQt4.QtGui import QApplication, QLabel, QWidget, QGridLayout

import sys
import worker

class Thread(QThread):
    #make new signals to be able to return an id for the thread
    startedx = pyqtSignal(int)
    finishedx = pyqtSignal(int)

    def __init__(self,i,parent=None):
        super().__init__(parent)
        self.idd = i

        self.started.connect(self.starttt)
        self.finished.connect(self.finisheddd)

    @pyqtSlot()
    def starttt(self):
        print('started signal from thread emitted')
        self.startedx.emit(self.idd) 

    @pyqtSlot()
    def finisheddd(self):
        print('finished signal from thread emitted')
        self.finishedx.emit(self.idd)

class Form(QWidget):

    def __init__(self):
        super().__init__()

        self.initUI()

        self.worker={}
        self.threadx={}
        self.i=0
        i=0

        #Establish the maximum number of threads the machine can optimally handle
        #Generally relates to the number of processors

        self.threadtest = QThread(self)
        self.idealthreadcount = self.threadtest.idealThreadCount()

        print("This machine can handle {} threads optimally".format(self.idealthreadcount))

        while i 

그리고 아래 작업자 코드

#!/usr/bin/env python3
#coding:utf-8
# Author:   --<>
# Purpose:  Stack Overflow
# Created: 19/12/15

import sys
import unittest


from PyQt4.QtCore import QThread, QObject, pyqtSignal, pyqtSlot
import time
import random


class Worker(QObject):
    finished = pyqtSignal(int)
    intReady = pyqtSignal(int,int)

    def __init__(self, i=0):
        '''__init__ is called while the worker is still in the Gui thread. Do not put slow or CPU intensive code in the __init__ method'''

        super().__init__()
        self.idd = i



    @pyqtSlot()
    def procCounter(self): # This slot takes no params
        for j in range(1, 10):
            random_time = random.weibullvariate(1,2)
            time.sleep(random_time)
            self.intReady.emit(j,self.idd)
            print('Worker {0} in thread {1}'.format(self.idd, self.thread().idd))

        self.finished.emit(self.idd)


if __name__=='__main__':
    unittest.main()
반응형