对我来说,一个相当常见的情况是定期更新一个值,比如说每30秒更新一次。例如,该值在网站上可用。你知道吗
我想获取这个值(使用阅读器),转换它(使用转换器)并发布结果,比如说在另一个网站上(使用发布者)。你知道吗
源和目标有时都不可用,我只对新值和超时感兴趣。你知道吗
我当前的方法是使用一个队列来处理我的值,另一个队列来处理我的结果。读卡器、转换器和发布器都是使用多重处理的独立“线程”。你知道吗
这样做的好处是,可以允许每个步骤“挂起”一段时间,下一步可以使用带有超时的get来实现一些默认操作,以防队列中没有有效消息。你知道吗
这种方法的缺点是,一旦转换器或发布程序暂停,我就只能在队列中保留所有以前的值和结果。在最坏的情况下,发布服务器有一个不可恢复的错误,整个工具的内存不足。你知道吗
解决这个问题的一个可能方法是将队列大小限制为1,使用非阻塞put,并通过丢弃当前值并重新放置新值来处理队列已满异常。对于这样一个简单的操作,这是相当多的代码,并且清楚地表明队列不是该作业的合适工具。你知道吗
我可以使用多处理原语编写自己的类来获得所需的行为,但这对我来说是一种非常常见的情况,因此我假设其他人也是这样,我觉得应该有一个“正确”的解决方案。你知道吗
简而言之,是否有一个标准的threadsafe类具有以下接口?你知道吗
class Updatable():
def put(value):
#store value, overwriting existing
def get(timeout):
#blocking, raises Exception when timeout is set and exceeded
return value
编辑:我当前使用多处理的实现
import multiprocessing
from time import sleep
class Updatable():
def __init__(self):
self.manager = multiprocessing.Manager()
self.ns = self.manager.Namespace()
self.updated = self.manager.Event()
def get(self, timeout=None):
self.updated.wait(timeout)
self.updated.clear()
return self.ns.x
def put(self, item):
self.ns.x = item
self.updated.set()
def consumer(updatable):
print(updatable.get()) # expect 1
sleep(1)
print(updatable.get()) # expect "2"
sleep(1)
print(updatable.get()) # expect {3}, after 2 sec
sleep(1)
print(updatable.get()) # expect [4]
sleep(2)
print(updatable.get()) # expect 6
sleep(1)
def producer():
sleep(.5) # make output more stable, by giving both sides 0.5 sec to process
updatable.put(1)
sleep(1)
updatable.put("2")
sleep(2)
updatable.put({3})
sleep(1)
updatable.put([4])
sleep(1)
updatable.put(5,) # will never be consumed
sleep(1)
updatable.put(6)
if __name__ == '__main__':
updatable = Updatable()
p = multiprocessing.Process(target=consumer, args=(updatable,))
p.start()
producer()
目前没有回答
相关问题 更多 >
编程相关推荐