我正在进行多重处理来处理一些非常大的文件。在
我可以使用集合。计数器在使用多处理.BaseManager子类。在
虽然我可以分享柜台和似乎腌制它似乎没有适当的腌制。我可以把这本词典复制成一本我可以腌制的新词典。在
我正在尝试了解如何在选择共享计数器之前避免它的“副本”。在
这是我的(伪代码):
from multiprocessing.managers import BaseManager
from collections import Counter
class MyManager(BaseManager):
pass
MyManager.register('Counter', Counter)
def main(glob_pattern):
# function that processes files
def worker_process(files_split_to_allow_naive_parallelization, mycounterdict):
# code that loops through files
for line in file:
# code that processes line
my_line_items = line.split()
index_for_read = (my_line_items[0],my_line_items[6])
mycounterdict.update((index_for_read,))
manager = MyManager()
manager.start()
mycounterdict = manager.Counter()
# code to get glob files , split them with unix shell split and then chunk then
for i in range(NUM_PROCS):
p = multiprocessing.Process(target=worker_process , args = (all_index_file_tuples[chunksize * i:chunksize * (i + 1)],mycounterdict))
procs.append(p)
p.start()
# Now we "join" the processes
for p in procs:
p.join()
# This is the part I have trouble with
# This yields a pickled file that fails with an error
pickle.dump(mycounterdict,open("Combined_count_gives_error.p","wb"))
# This however works
# How can I avoid doing it this way?
mycopydict = Counter()
mydictcopy.update(mycounterdict.items())
pickle.dump(mycopydict,open("Combined_count_that_works.p","wb"))
当我试图加载“pickled”错误给定文件(它总是较小的固定大小)时,我得到了一个没有意义的错误。在
如何在不通过上面的伪代码中创建新dict的情况下pickle共享dict。在
^{pr2}$
你的代码有几个问题。首先,如果文件悬而未决,则不能保证关闭该文件。其次,
mycounterdict
不是一个实际的Counter
,而是它的代理-pickle它,您将遇到许多问题,因为它在这个过程之外是无法处理的。但是,您不需要使用update
进行复制:.copy
将生成一个新的Counter
副本。在所以你应该使用
如果这是一个好的模式,答案是不。为了更简单的代码,您应该在每个进程中分别计数,而不是使用共享计数器:
^{pr2}$相关问题 更多 >
编程相关推荐