正在与多处理Queu作斗争

2024-09-30 16:34:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我的结构(大量简化)如下所示:

import multiprocessing

def creator():
    # creates files
    return


def relocator():
    # moves created files
    return


create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()

我要做的是让一堆文件由creator创建,一旦它们被创建,就由relocator将它们移动到另一个目录。你知道吗

我想在这里使用multiprocessing的原因是:

  • 我不想creator等待移动首先完成,因为移动需要时间,我不想浪费时间。你知道吗
  • 在开始复制之前先创建所有文件也不是一个选项,因为驱动器中没有足够的空间容纳所有文件。你知道吗

我希望creatorrelocator进程都是串行的(每次一个文件),但是并行运行。操作的“日志”应该是这样的:

# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file

根据我读到的,这里的方法是Queue。你知道吗

策略:(可能不是最好的?!)你知道吗

创建文件后,它将进入队列,重新定位完成后,它将从队列中删除。你知道吗

但是,我在编码它时遇到了问题;同时创建了多个文件(多个并行运行的creator实例)和其他文件。。。你知道吗

如有任何想法、提示、解释等,我将不胜感激


Tags: 文件creatingtargetreturndefcreatefilesmultiprocessing
1条回答
网友
1楼 · 发布于 2024-09-30 16:34:05

让我们把你的想法和分裂在这个特点:

  1. 创建者应创建文件(例如100个)

  2. 重新定位程序应一次移动一个文件,直到没有更多文件可移动

  3. 创建者可以在重新定位器之前结束,因此它也可以 把自己变成一个重新定位者,两个人都得知道什么时候该做 完成

因此,我们有两个主要功能:

def create(i):
    # creates files and return outpath
    return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))


def relocate(from, to):
    # moves created files
    shuttil.move(from, to)

现在让我们创建流程:

from multiprocessing import Process, Queue

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()

这样我们就有了一个creator和一个relocator,但是,假设现在我们希望creator在创建作业完成后开始重新定位,我们可以只使用relocator,但是我们需要再推一个"STOP_FLAG",因为我们将有两个进程重新定位

def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(2):
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

假设我们现在需要任意数量的重定位程序进程,我们应该调整代码以处理此问题,我们需要creator方法知道有多少标志通知其他进程何时停止,我们得到的代码如下所示:

from multiprocessing import Process, Queue, cpu_count

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(number_of_subprocesses + 1): # we need to count ourselves
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
    rp.start()

然后你必须等待他们完成:

creator_process.join()
for rp in relocators:
    rp.join()

你可能想在^{} documentation检查一下

特别是^{} method(默认情况下是阻塞调用)

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available.

相关问题 更多 >