我试图用共享的dict
和multipleprocessing
来计算词频。我为一些初始测试编写了一个简单的Python代码片段:
from multiprocessing import Manager, Pool
def foo(num):
try:
d[num] += 1
except KeyError:
d[num] = 1
d = Manager().dict()
pool = Pool(processes=2, maxtasksperchild=100)
tasks = [1] * 1001 + [2] * 2000 + [3] * 1300
pool.map_async(foo, tasks)
pool.close()
pool.join()
print len(tasks)
print d
但是,d
中的频率总数与tasks
中的频率总数不匹配。在我看来,d
并没有很好地同步,但我不知道为什么会发生这种情况以及如何解决。有人能帮我一下吗?你知道吗
这里有一个比赛条件:
假设任务1尝试执行
d[1] += 1
,但是d[1]
是空的,所以它得到一个KeyError
。另一个核心上的任务2尝试执行d[1] += 1
,但是d[1]
仍然是空的,因此它也得到了一个KeyError
。所以,现在任务1和任务2都将尝试设置d[1] = 1
,并且它们都会成功,所以d[1]
现在是1
,并且丢失了1个增量。你知道吗更糟糕的是,假设在任务1开始设置
d[1] = 1
之前,任务3-10都在另一个核心上运行,并完成d[1]
一直递增到9
。然后task1进入并将其设置回1
,您已经丢失了9个增量。你知道吗您可能认为可以通过预先初始化
d = {1: 0, 2: 0, 3: 0}
并省略try
/except
来解决这个问题。但这仍然行不通。因为即使d[1] += 1
也不是原子的。Python有效地将其编译成三个独立的操作:tmp = d.__getitem__(1)
、tmp += 1
、d.__setitem__(1, tmp)
。你知道吗因此,任务1可以从共享dict中获取现有的0,将其增量为1,同时任务2已经获取现有的0,将其增量为1,现在它们都去存储
1
,并且都成功了。而且,您可以再次看到这是如何扩展到丢失大量增量而不是一个增量。你知道吗对共享数据的任何非原子操作都必须显式同步。文档中的Synchronization between processes和Sharing state between processes对此进行了解释。你知道吗
这里最简单的修复方法(虽然显然不是最好的,因为它最终会序列化所有访问)是:
如果你想让这更有趣,更有效,你必须学习如何共享内存线程和同步,有太多的方式来解释一个单一的答案。你知道吗
相关问题 更多 >
编程相关推荐