python多处理共享计数器,pickling

2024-06-26 14:07:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在进行多重处理来处理一些非常大的文件。在

我可以使用集合。计数器在使用多处理.BaseManager子类。在

虽然我可以分享柜台和似乎腌制它似乎没有适当的腌制。我可以把这本词典复制成一本我可以腌制的新词典。在

我正在尝试了解如何在选择共享计数器之前避免它的“副本”。在

这是我的(伪代码):

from multiprocessing.managers import BaseManager
from collections import Counter

class MyManager(BaseManager):
    pass

MyManager.register('Counter', Counter)

def main(glob_pattern):
    # function that processes files
    def worker_process(files_split_to_allow_naive_parallelization, mycounterdict):
        # code that loops through files
        for line in file:
            # code that processes line
            my_line_items = line.split()
            index_for_read = (my_line_items[0],my_line_items[6])
            mycounterdict.update((index_for_read,))

    manager = MyManager()
    manager.start()
    mycounterdict = manager.Counter()

    # code to get glob files , split them with unix shell split and then chunk then

    for i in range(NUM_PROCS):
        p = multiprocessing.Process(target=worker_process , args = (all_index_file_tuples[chunksize * i:chunksize * (i + 1)],mycounterdict))
        procs.append(p)
        p.start()
    # Now we "join" the processes
    for p in procs:
        p.join()

    # This is the part I have trouble with
    # This yields a pickled file that fails with an error
    pickle.dump(mycounterdict,open("Combined_count_gives_error.p","wb"))

    # This however works
    # How can I avoid doing it this way?
    mycopydict = Counter()
    mydictcopy.update(mycounterdict.items())
    pickle.dump(mycopydict,open("Combined_count_that_works.p","wb"))

当我试图加载“pickled”错误给定文件(它总是较小的固定大小)时,我得到了一个没有意义的错误。在

如何在不通过上面的伪代码中创建新dict的情况下pickle共享dict。在

^{pr2}$

Tags: inforthatmylinecountercodeitems
1条回答
网友
1楼 · 发布于 2024-06-26 14:07:20

你的代码有几个问题。首先,如果文件悬而未决,则不能保证关闭该文件。其次,mycounterdict不是一个实际的Counter,而是它的代理-pickle它,您将遇到许多问题,因为它在这个过程之外是无法处理的。但是,您不需要使用update进行复制:.copy将生成一个新的Counter副本。在

所以你应该使用

with open("out.p", "wb") as f:
    pickle.dump(mycounterdict.copy(), f)

如果这是一个好的模式,答案是。为了更简单的代码,您应该在每个进程中分别计数,而不是使用共享计数器:

^{pr2}$

相关问题 更多 >