具有PySide或PyQ的工作进程的规范示例

2024-09-30 10:29:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我在寻找一些从Python创建的qtgui管理工作进程的好例子。我需要尽可能完整,包括报告进程的进度,包括中止进程,包括处理进程中可能出现的错误。在

我只发现一些半成品的例子,只做了一部分工作,但当我试图使他们完整,我失败了。我目前的设计分为三层:

1)主线程驻留在GUI中,ProcessScheduler控制只有一个工作进程实例正在运行并可以中止它

2)在另一个线程中,我有ProcessObserver,它实际运行进程并理解来自队列的内容(用于进程间通信),这必须在非GUI线程中以保持GUI响应

3)有一个实际的工作进程,它执行一段给定的代码(我未来的意图是用multiprocess或{}或其他可以pickle函数对象的东西来替换{},但这不是我当前的问题),并向队列报告进度或结果

目前我有这个片段(代码中的print函数只是用于调试,最终将被删除):

import multiprocessing

from PySide import QtCore, QtGui
QtWidgets = QtGui

N = 10000000

# I would like this to be a function object
# but multiprocessing cannot pickle it :(
# so I will use multiprocess in the future
CODE = """
# calculates sum of numbers from 0 to n-1
# reports percent progress of finished work

sum = 0
progress = -1
for i in range(n):
    sum += i
    p = i * 100 // n
    if p > progress:
        queue.put(["progress", p])
        progress = p
queue.put(["result", sum])
"""


class EvalProcess(multiprocessing.Process):

    def __init__(self, code, symbols):
        super(EvalProcess, self).__init__()
        self.code= code
        self.symbols = symbols  # symbols must contain 'queue'

    def run(self):
        print("EvalProcess started")
        exec(self.code, self.symbols)
        print("EvalProcess finished")


class ProcessObserver(QtCore.QObject):
    """Resides in worker thread. Its role is to understand
    to what is received from the process via the queue."""

    progressChanged = QtCore.Signal(float)
    finished = QtCore.Signal(object)

    def __init__(self, process, queue):
        super(ProcessObserver, self).__init__()
        self.process = process
        self.queue = queue

    def run(self):
        print("ProcessObserver started")
        self.process.start()
        try:
            while True:
                # this loop keeps running and listening to the queue
                # even if the process is aborted
                result = self.queue.get()
                print("received from queue:", result)
                if result[0] == "progress":
                    self.progressChanged.emit(result[1])
                elif result[0] == "result":
                    self.finished.emit(result[1])
                    break

        except Exception as e:
            print(e)  # QUESTION: WHAT HAPPENS WHEN THE PROCESS FAILS?
        self.process.join()  # QUESTION: DO I NEED THIS LINE?
        print("ProcessObserver finished")


class ProcessScheduler(QtCore.QObject):
    """Resides in the main thread."""

    sendText = QtCore.Signal(str)

    def __init__(self):
        super(ProcessScheduler, self).__init__()
        self.observer = None
        self.thread = None
        self.process = None
        self.queue = None

    def start(self):
        if self.process:  # Q: IS THIS OK?
            # should kill current process and start a new one
            self.abort()
        self.queue = multiprocessing.Queue()
        self.process = EvalProcess(CODE, {"n": N, "queue": self.queue})
        self.thread = QtCore.QThread()
        self.observer = ProcessObserver(self.process, self.queue)
        self.observer.moveToThread(self.thread)
        self.observer.progressChanged.connect(self.onProgressChanged)
        self.observer.finished.connect(self.onResultReceived)
        self.thread.started.connect(self.observer.run)
        self.thread.finished.connect(self.onThreadFinished)
        self.thread.start()
        self.sendText.emit("Calculation started")

    def abort(self):
        self.process.terminate()
        self.sendText.emit("Aborted.")
        self.onThreadFinished()

    def onProgressChanged(self, percent):
        self.sendText.emit("Progress={}%".format(percent))

    def onResultReceived(self, result):
        print("onResultReceived called")
        self.sendText.emit("Result={}".format(result))
        self.thread.quit()

    def onThreadFinished(self):
        print("onThreadFinished called")
        self.thread.deleteLater()  # QUESTION: DO I NEED THIS LINE?
        self.thread = None
        self.observer = None
        self.process = None
        self.queue = None


if __name__ == '__main__':
    app = QtWidgets.QApplication([])

    scheduler = ProcessScheduler()

    window = QtWidgets.QWidget()
    layout = QtWidgets.QVBoxLayout(window)

    startButton = QtWidgets.QPushButton("sum(range({}))".format(N))
    startButton.pressed.connect(scheduler.start)
    layout.addWidget(startButton)

    abortButton = QtWidgets.QPushButton("Abort")
    abortButton.pressed.connect(scheduler.abort)
    layout.addWidget(abortButton)

    console = QtWidgets.QPlainTextEdit()
    scheduler.sendText.connect(console.appendPlainText)
    layout.addWidget(console)

    window.show()
    app.exec_()

它工作得还行,但仍然缺乏正确的错误处理和进程中止。尤其是我现在正在为堕胎而挣扎。主要的问题是,即使进程在计算过程中被中止/终止(或者至少它在控制台QThread: Destroyed while thread is still running中打印此错误),工作线程仍继续运行(在侦听队列的循环中)。有没有办法解决这个问题?或者其他方法?或者,如果可能的话,有没有任何现实生活中的例子可以证明这项任务满足了上述所有要求?如有任何评论,将不胜感激。在


Tags: selfnonequeue进程defconnectresultprocess

热门问题