如何在不阻塞数据流的同时保存数据流中的数据?(PyQt5 signal emit()性能)

2024-10-03 04:34:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个PyQt5应用程序。 在我的应用程序中,它有一个数据流,它的速度大约是5~20个数据/秒。在

每次数据到达时,将调用Analyzer类的onData()方法。(以下代码是我的应用程序的简化代码)

class Analyzer():
    def __init__(self):
        self.cnt = 0
        self.dataDeque = deque(MAXLENGTH=10000)
    def onData(self, data):
        self.dataDeque.append({
            "data": data, 
            "createdTime": time.time()
        })
        self.cnt += 1
        if self.cnt % 10000 == 0:
            pickle.dump(dataDeque, open(file, 'wb'))

但问题是,这个dataDeque对象太大(50~150MB),所以转储pickle大约需要1~2秒。在

在此期间(1~2秒),调用onData()方法的请求排队,1~2秒后,排队的请求同时调用大量onData()方法,最终扭曲了createdTime的数据。在

用qti来解决我的线程问题。在

以下代码是已编辑的代码。在

^{pr2}$

下面的代码是PickleDumpingThread类。在

class PickleDumpingThread(QThread):
   def __init__(self):
       super().__init__()
       self.daemon = True
       self.pickleDumpingSignal[dict].connect(self.savePickle)

   def savePickle(self, signal_dict):
       pickle.dump(signal_dict["deque"], open(file, 'wb'))

我预计这段新编辑的代码会显著减少流阻塞时间(1~2秒),但这段代码仍然阻塞流大约0.5~2秒。在

似乎pickleDumpingThread.pickleDumpingSignal.emit(somedict)需要0.5~2秒。在

我的问题是三件事。在

  1. signal emit()函数的性能不是这样好吗?

  2. 在我的函数emit()中有没有其他方法?

  3. 或者有什么方法可以在不阻塞数据流的同时保存pickle? (如有任何修改我的代码的建议,我们将不胜感激)

谢谢你读了这个长长的问题!在


Tags: 数据方法代码self应用程序datasignalinit
3条回答

像这样的事情也许会奏效

class PickleDumpingThread(QThread):
   def __init__(self, data):
       super().__init__()
       self.data = data

   def run(self):
       pickle.dump(self.data["deque"], open(file, 'wb'))
       self.emit(QtCore.SIGNAL('threadFinished(int)'), self.currentThreadId())

class Analyzer():
    def __init__(self):
        self.cnt = 0
        self.dataDeque = deque(MAXLENGTH=10000)
        self.threadHandler = {}

    def onData(self, data):
        self.dataDeque.append({ "data": data, "createdTime": time.time() })
        self.cnt += 1
        if self.cnt % 10000 == 0:
            thread = PickleDumpingThread(self.dataDeque)
            self.connect(thread, QtCore.SIGNAL("threadFinished(int)"), self.threadFinished)
            thread.start() 
            self.threadHandler[thread.currentThreadId()] = thread

    @QtCore.pyqtSlot(int)
    def threadFinished(id):
        del self.threadHandler[id]

self.threadHandler只是想知道有多少线程仍在运行,可以去掉它和threadFinished方法

问题是我没有正确使用QThread。在

印刷的结果

print("(Current Thread)", QThread.currentThread(),"\n")
print("(Current Thread)", int(QThread.currentThreadId()),"\n")

注意到我创建的PickleDumpingThread是在主线程中运行的,而不是在某个单独的线程中运行。在

这是因为run()QThread中唯一在独立线程中运行的函数,所以像QThread中的savePickle这样的方法在主线程中运行。在


第一个解决方案

using signal的正确用法是使用Worker,如下所示。在

^{pr2}$

这个解决方案是可行的(pickle被转储在单独的线程中),但是它的缺点是由于signal emit()函数,数据流仍然延迟大约0.5~1秒。在

我发现对于我的情况,最好的解决方案是@PYPL的代码,但是代码需要一些修改才能工作。在


最终解决方案

最终的解决方案是修改@PYPL的以下代码

^{3}$

self.thread = PickleDumpingThread(self.dataDeque)
self.thread.start() 

原始代码有一些运行时错误。线程在转储pickle之前似乎被垃圾回收了,因为onData()函数完成后没有对该线程的引用。在

通过添加self.thread引用线程解决了这个问题。在

而且,旧的PickleDumpingThread似乎是在新的PickleDumpingThread被{}引用之后被垃圾回收的(因为旧的PickleDumpingThread丢失了它的引用)。在

但是,这个声明没有被证实(因为我不知道如何查看当前活动线程)。。在

不管怎样,这个解决方案解决了问题。在


编辑

我的最终解决方案也有延迟。打电话要花一些时间线程启动().. 在

我选择的最终解决方案是在线程中运行无限循环,并监视该线程的一些变量,以确定何时保存pickle。仅仅在线程中使用无限循环就需要大量的cpu,所以我添加了时间。睡觉(0.1)以降低cpu使用率。在


最终编辑

好吧,我的“最终解决方案”也有延迟。。 即使我将转储作业移到另一个QThread,主线程仍有pickle转储时间的延迟!太奇怪了。在

但我找到了原因。原因既不是emit()的性能,也不是我所想的。在

令人尴尬的是,原因是python's Global Interpreter Lock prevents two threads in the same process from running Python code at the same time。在

所以在本例中,我可能应该使用multiprocessing模块。在

我将在修改代码以使用multiprocessing模块后发布结果。在

在使用multiprocessing模块和以后的尝试后进行编辑

使用multiprocessing模块

使用multiprocessing模块解决了并发运行python代码的问题,但是出现了新的本质问题。新的问题是“在进程之间传递共享内存变量需要相当长的时间”(在我的例子中,将deque对象传递到子进程需要1~2秒)。我发现只要我使用multiprocessing模块,这个问题就无法消除。所以我放弃了使用多处理模块

未来可能的尝试

1。{/strong>仅在文件中执行

pickle转储的本质问题不是写入文件,而是在写入文件之前序列化。Python在写入文件时释放GIL,因此磁盘I/O可以在QThread中并发完成。问题是,在用pickle.dump方法写入文件之前,将deque对象序列化为字符串需要一些时间,而在此期间,由于GIL,主线程将被阻塞。在

因此,下面的方法将有效地减少延迟的长度。在

  1. 每次调用onData()时,我们都会以某种方式对数据对象进行字符串化,并将其推送到deque object

  2. PickleDumpingThread中,只要join对象list(deque)来字符串化deque对象。

  3. file.write(stringified_deque_object)。这可以同时进行。

第一步只需要很短的时间,所以几乎不需要块k主螺纹。 步骤2可能需要一些时间,但显然比在pickle.dump方法中序列化python对象花费的时间要短。 步骤3不阻塞主线程。在

2。使用C扩展名

我们可以手动释放GIL并在我们定制的C扩展模块中重新获取GIL。但这可能是肮脏的。在

3。将CPython移植到Jython或IronPython

Jython和IronPython是其他分别使用Java和C的python实现。因此,它们在实现中不使用GIL,这意味着thread的工作方式与线程类似。 一个问题是这些实现中不支持PyQt。。在

4。移植到另一种语言

。。在

注:

  1. json.dump我的数据也花了1~2秒。

  2. Cython不是这个案子的选择。尽管Cython有with nogil:,但在该块中只能访问非python对象(deque对象不能在该块中访问),并且我们不能在该块中使用pickle.dump方法。

当GIL出现问题时,解决方法是将任务细分为块,这样可以刷新块之间的GUI。在

例如,假设您有一个大的S大小的列表要转储,那么您可以尝试定义一个从list派生并重写getstate的类来返回N个子pickle对象,每个对象都是一个类的实例,比如subpicle,包含列表中的S/N项。每个子块仅在酸洗时存在,并定义getstate执行两个操作:

  • 呼叫qApp.processEvents()在gui上,以及
  • 返回序列号项目的子列表。在

在取消拾取时,每个子项都将刷新GUI并获取项列表;最后,在原始对象中从其setstate中接收的所有子项中重新创建总列表。在

如果您想在控制台应用程序(或非pyqt gui)中取消pickle,您应该抽象出对process事件的调用。为此,可以在子菜单上定义一个类范围的属性,比如process_events,默认情况下为None;如果不是None,setstate会将其作为函数调用。因此,默认情况下,子点击之间没有GUI刷新,除非取消拾取的应用程序在开始取消拾取之前将此属性设置为可调用。在

这种策略将使您的GUi在取消拾取过程中有机会重新绘制(如果需要,只使用一个线程)。在

实施取决于您的确切数据,但下面是一个示例,它演示了一个大列表的原理:

import pickle

class SubList:
    on_pickling = None

    def __init__(self, sublist):
        print('SubList', sublist)
        self.data = sublist

    def __getstate__(self):
        if SubList.on_pickling is not None:
            print('SubList pickle state fetch: calling sub callback')
            SubList.on_pickling()
        return self.data

    def __setstate__(self, obj):
        if SubList.on_pickling is not None:
            print('SubList pickle state restore: calling sub callback')
            SubList.on_pickling()
        self.data = obj


class ListSubPickler:
    def __init__(self, data: list):
        self.data = data

    def __getstate__(self):
        print('creating SubLists for pickling long list')
        num_chunks = 10
        span = int(len(self.data) / num_chunks)
        SubLists = [SubList(self.data[i:(i + span)]) for i in range(0, len(self.data), span)]
        return SubLists

    def __setstate__(self, subpickles):
        self.data = []
        print('restoring Pickleable(list)')
        for subpickle in subpickles:
            self.data.extend(subpickle.data)
        print('final', self.data)

def refresh():
    # do something: refresh GUI (for example, qApp.processEvents() for Qt), show progress, etc
    print('refreshed')

data = list(range(100))  # your large data object
list_pickler = ListSubPickler(data)
SubList.on_pickling = refresh

print('\ndumping pickle of', list_pickler)
pickled = pickle.dumps(list_pickler)

print('\nloading from pickle')
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data

print('\nloading from pickle, without on_pickling')
SubList.on_pickling = None
new_list_pickler = pickle.loads(pickled)
assert new_list_pickler.data == data

易于应用于dict,甚至可以使用isinstance使其适应它接收的数据类型。在

相关问题 更多 >