如何在线程之间共享一个值,并通知正在使用的线程一个新值正在被共享

2024-06-25 06:29:02 发布

您现在位置:Python中文网/ 问答频道 /正文

对我来说,一个相当常见的情况是定期更新一个值,比如说每30秒更新一次。例如,该值在网站上可用。你知道吗

我想获取这个值(使用阅读器),转换它(使用转换器)并发布结果,比如说在另一个网站上(使用发布者)。你知道吗

源和目标有时都不可用,我只对新值和超时感兴趣。你知道吗

我当前的方法是使用一个队列来处理我的值,另一个队列来处理我的结果。读卡器、转换器和发布器都是使用多重处理的独立“线程”。你知道吗

这样做的好处是,可以允许每个步骤“挂起”一段时间,下一步可以使用带有超时的get来实现一些默认操作,以防队列中没有有效消息。你知道吗

这种方法的缺点是,一旦转换器或发布程序暂停,我就只能在队列中保留所有以前的值和结果。在最坏的情况下,发布服务器有一个不可恢复的错误,整个工具的内存不足。你知道吗

解决这个问题的一个可能方法是将队列大小限制为1,使用非阻塞put,并通过丢弃当前值并重新放置新值来处理队列已满异常。对于这样一个简单的操作,这是相当多的代码,并且清楚地表明队列不是该作业的合适工具。你知道吗

我可以使用多处理原语编写自己的类来获得所需的行为,但这对我来说是一种非常常见的情况,因此我假设其他人也是这样,我觉得应该有一个“正确”的解决方案。你知道吗

简而言之,是否有一个标准的threadsafe类具有以下接口?你知道吗

class Updatable():
    def put(value):
        #store value, overwriting existing

    def get(timeout):
        #blocking, raises Exception when timeout is set and exceeded
        return value

编辑:我当前使用多处理的实现

import multiprocessing
from time import sleep

class Updatable():
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.ns = self.manager.Namespace()
        self.updated = self.manager.Event()

    def get(self, timeout=None):
        self.updated.wait(timeout)
        self.updated.clear()
        return self.ns.x

    def put(self, item):
        self.ns.x = item
        self.updated.set()


def consumer(updatable):
    print(updatable.get())  # expect 1
    sleep(1)
    print(updatable.get())  # expect "2"
    sleep(1)
    print(updatable.get())  # expect {3}, after 2 sec
    sleep(1)
    print(updatable.get())  # expect [4]
    sleep(2)
    print(updatable.get())  # expect 6
    sleep(1)

def producer():
    sleep(.5)  # make output more stable, by giving both sides 0.5 sec to process
    updatable.put(1)
    sleep(1)
    updatable.put("2")
    sleep(2)
    updatable.put({3})
    sleep(1)
    updatable.put([4])
    sleep(1)
    updatable.put(5,)  # will never be consumed 
    sleep(1)
    updatable.put(6)

if __name__ == '__main__':
    updatable = Updatable()
    p = multiprocessing.Process(target=consumer, args=(updatable,))
    p.start()
    producer()

Tags: 方法selfget队列putvaluedeftimeout