假设我们将一个数字拆分为不同的域:例如:100拆分为:[0,25][25,50][50,75][75100]。然后,我们将这4个列表中的每一个发送到4个单独进程中的一个进行计算,然后将其作为数字100的单个分割单元进行重新组合以得到答案。我们连续多次重复此过程,需要将1000个数字作为一个单元,分成类似于[0,25][25,50][50,75][75,100]的单独域。如果我们必须关闭流程,使其作为一个单独的组单元来处理答案,那么就会出现效率问题。由于windows在运行进程方面与Unix相比是垃圾,我们被迫使用“spawn”方法而不是fork。spawn方法在生成进程时很慢,所以我想为什么不保持进程“打开并向它们传递数据,而不需要为并行进程的每个迭代组打开和关闭它们”。下面的示例代码将实现这一点。它将使进程作为类使用者保持打开状态,该类使用者将不断使用run()(在while循环中)请求.get()可加入队列的下一个_任务:
import multiprocessing
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill shutdown of .get() loop with break
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
for i in range(self.b):
if self.a % i == 0:
return 0
return 1
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Number of consumers equal to system cpu_count
num_consumers = multiprocessing.cpu_count()
# Make a list of Consumer object process' ready to be opened.
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.
# We start to .get() the results in a different loop-
for _ in range(num_jobs): # -so the above loop enqueues all jobs without-
result = results.get() # -waiting for the previous .put() to .get() first.
# Add a poison pill for each consumer
for i in range(num_consumers): # We only do this when all computation is done.
tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
这段代码只是一个例子。在这段代码的其他变体中:当实现更多的tasks.put()和results.get()迭代时,需要一种方法使排队的任务(对象)在完全计算答案并自行返回之前通过外部调用返回。这将释放资源,如果你已经得到你的答案,从另一个进程'的单一分裂数字组。需要存在__call__
描述符,任务(对象)才能作为调用tasks.put(Task(i,100))的函数工作。在过去的两周里,我一直在努力寻找一种有效的方法。我需要采取完全不同的方法吗?请不要误解我的困境,我使用的代码可以工作,但效率不如我在Microsoft Windslow上希望的那样高。任何帮助都将不胜感激
任务(对象)是否与排队的Consumer()进程位于同一进程中?如果是这样的话,我是否可以告诉类Consumer()Run()的所有进程停止当前正在运行的任务(对象),而不关闭它们的while循环(使用毒丸),这样它们就可以立即接受另一个任务(),而无需再次关闭和重新打开它们的进程?当您为迭代计算打开和关闭数千个进程时,它确实会增加和浪费时间。我尝试过使用事件()管理器()其他队列()。似乎没有一种有效的方法可以从外部干预任务(对象)以立即return
到其父使用者(),这样,如果其他使用者()中的一个返回一个使其他使用者()计算的答案,它就不会继续浪费计算资源任务是不相关的,因为它们都是将单个数字划分为多个组的统一计算
您所做的是实现您自己的多处理池,但为什么?您是否不知道
concurrent.futures.ProcessPoolExecutor
和multiprocessing.pool.Pool
类的存在,后者实际上更适合您的特定问题这两个类都实现了多处理池和各种方法,用于将任务提交到池中并从这些任务中获取结果。但是,由于在您的特定情况下,您提交的任务正试图解决相同的问题,并且您只对第一个可用结果感兴趣,一旦完成了,您需要能够终止任何剩余的正在运行的任务。只有
multiprocessing.pool.Pool
允许您这样做下面的代码使用方法
Pool.apply_async
提交任务。此函数不阻塞,而是返回一个AsyncResult
实例,该实例具有一个阻塞get
方法,您可以调用该方法从提交的任务中获取结果。但是,由于通常您可能会提交许多任务,我们不知道调用这些实例中的哪一个。因此,解决方案是使用apply_async
的callback
参数来指定一个函数,该函数将在任务可用时使用返回值异步调用。然后问题就变成了将这个结果传回。有两种方法:方法1:通过全局变量
印刷品:
方法2:通过队列
印刷品:
更新
即使在Windows下,与任务完成所需的时间相比,创建和重新创建池的时间可能相对较少,特别是对于以后的迭代,即
n
的较大值。如果调用的是同一个worker函数,那么第三种方法是使用pool方法imap_unordered
。我还包括一些代码,用于测量我的桌面启动新池实例的开销:印刷品:
更新2
这是一个两难境地:您有多个进程处理一个拼图的各个部分。例如,当一个进程发现某个数字可被通过范围内的某个数字整除时,在其他进程中,从完成其测试开始测试不同范围内的数字是没有意义的。你可以做三件事中的一件。在开始下一次迭代之前,您可以什么都不做,让流程完成。但这会延迟下一次迭代。我已经建议您终止进程,这样可以释放处理器。但这需要你创建新的流程,而你发现这些流程并不令人满意
我只能想到另一种可能性,我在下面使用您的方法进行多处理。名为
stop
的多处理共享内存变量以每个进程作为全局变量进行初始化,并在每次迭代之前设置为0。当一个任务被设置为返回值0,并且在其他进程中运行的其他任务没有继续执行的意义时,它将stop
的值设置为1。这意味着任务必须定期检查stop
的值,如果该值已设置为1,则返回当然,这会给处理过程增加额外的周期。在下面的演示中,我实际上有100个任务排队等待8个处理器。但是,最后92个任务将立即发现stop
已经设置,并且应该在第一次迭代时返回顺便说一句:原始代码使用
multiprocessing.JoinableQueue
实例对任务进行排队,而不是使用multiprocessing.Queue
实例,并在消息从队列中移除时在此实例上调用task_done
。然而,在这个队列上从来没有调用过join
(它会告诉您所有消息何时被删除),从而破坏了拥有这样一个队列的全部目的。事实上,不需要JoinableQueue
,因为主进程已经提交了num_jobs
个作业,并且在结果队列上期望num_jobs
个消息,并且可以循环并拉取期望的n结果队列中的结果数。我用一个简单的Queue
替换了JoinableQueue
,保留了原始代码,但注释掉了。此外,Consumer
进程可以创建为守护进程(参数为daemon=True
),然后当所有非守护进程(即主进程)终止时,它们将自动终止,从而避免使用特殊的“毒丸”None
任务消息。我已经做了更改,并且再次保持了原始代码的完整性,但是注释掉了以供比较印刷品:
我终于想出了一个解决办法
如果打开的使用者对象使用同一行上的If else语句为下一个_task()函数调用分配答案,则当设置state.Event()标志时,它将退出,因为它锁定到该行,直到排队的任务对象分配变量“answer”。这是一个伟大的解决办法!它使任务对象在消费者的while循环中可中断,消费者通过变量“answer”赋值运行任务对象。两个多星期后,我找到了一个可以加快速度的解决方案!我在代码的工作版本上测试了它,速度更快!使用这样的方法,可以无限期地打开多个进程,并通过Consumer object joinable Queue循环传递许多不同的任务对象,以极快的速度并行处理大量数据!这段代码使所有内核都像一个“超级内核”一样工作,其中所有进程都对每个内核保持开放,并为任何所需的迭代i/o流协同工作
以下是我的一个python多处理程序在8个超线程内核上实现此方法的示例输出:
1至200000(除#2外)的所有17983个素数在~1分钟内达到全模数
在3990x128线程AMD Threadripper上,需要约8秒
以下是8个超线程内核上的另一个输出:
相关问题 更多 >
编程相关推荐