Windows环境下高效的Python多处理

2024-06-01 22:47:33 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我们将一个数字拆分为不同的域:例如:100拆分为:[0,25][25,50][50,75][75100]。然后,我们将这4个列表中的每一个发送到4个单独进程中的一个进行计算,然后将其作为数字100的单个分割单元进行重新组合以得到答案。我们连续多次重复此过程,需要将1000个数字作为一个单元,分成类似于[0,25][25,50][50,75][75,100]的单独域。如果我们必须关闭流程,使其作为一个单独的组单元来处理答案,那么就会出现效率问题。由于windows在运行进程方面与Unix相比是垃圾,我们被迫使用“spawn”方法而不是fork。spawn方法在生成进程时很慢,所以我想为什么不保持进程“打开并向它们传递数据,而不需要为并行进程的每个迭代组打开和关闭它们”。下面的示例代码将实现这一点。它将使进程作为类使用者保持打开状态,该类使用者将不断使用run()(在while循环中)请求.get()可加入队列的下一个_任务:

import multiprocessing


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill shutdown of .get() loop with break
                self.task_queue.task_done()
                break
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        for i in range(self.b):
            if self.a % i == 0:
                return 0
        return 1


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Number of consumers equal to system cpu_count
    num_consumers = multiprocessing.cpu_count() 
    
    # Make a list of Consumer object process' ready to be opened.
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]

    for w in consumers:
        w.start()

    # Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.

    # We start to .get() the results in a different loop-
    for _ in range(num_jobs):  # -so the above loop enqueues all jobs without- 
        result = results.get() # -waiting for the previous .put() to .get() first.
   
    # Add a poison pill for each consumer
    for i in range(num_consumers): # We only do this when all computation is done.
        tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.

这段代码只是一个例子。在这段代码的其他变体中:当实现更多的tasks.put()和results.get()迭代时,需要一种方法使排队的任务(对象)在完全计算答案并自行返回之前通过外部调用返回。这将释放资源,如果你已经得到你的答案,从另一个进程'的单一分裂数字组。需要存在__call__描述符,任务(对象)才能作为调用tasks.put(Task(i,100))的函数工作。在过去的两周里,我一直在努力寻找一种有效的方法。我需要采取完全不同的方法吗?请不要误解我的困境,我使用的代码可以工作,但效率不如我在Microsoft Windslow上希望的那样高。任何帮助都将不胜感激

任务(对象)是否与排队的Consumer()进程位于同一进程中?如果是这样的话,我是否可以告诉类Consumer()Run()的所有进程停止当前正在运行的任务(对象),而不关闭它们的while循环(使用毒丸),这样它们就可以立即接受另一个任务(),而无需再次关闭和重新打开它们的进程?当您为迭代计算打开和关闭数千个进程时,它确实会增加和浪费时间。我尝试过使用事件()管理器()其他队列()。似乎没有一种有效的方法可以从外部干预任务(对象)以立即return到其父使用者(),这样,如果其他使用者()中的一个返回一个使其他使用者()计算的答案,它就不会继续浪费计算资源任务是不相关的,因为它们都是将单个数字划分为多个组的统一计算


Tags: 方法inselffortaskgetqueueput
2条回答

您所做的是实现您自己的多处理池,但为什么?您是否不知道concurrent.futures.ProcessPoolExecutormultiprocessing.pool.Pool类的存在,后者实际上更适合您的特定问题

这两个类都实现了多处理池和各种方法,用于将任务提交到池中并从这些任务中获取结果。但是,由于在您的特定情况下,您提交的任务正试图解决相同的问题,并且您只对第一个可用结果感兴趣,一旦完成了,您需要能够终止任何剩余的正在运行的任务。只有multiprocessing.pool.Pool允许您这样做

下面的代码使用方法Pool.apply_async提交任务。此函数不阻塞,而是返回一个AsyncResult实例,该实例具有一个阻塞get方法,您可以调用该方法从提交的任务中获取结果。但是,由于通常您可能会提交许多任务,我们不知道调用这些实例中的哪一个。因此,解决方案是使用apply_asynccallback参数来指定一个函数,该函数将在任务可用时使用返回值异步调用。然后问题就变成了将这个结果传回。有两种方法:

方法1:通过全局变量

from multiprocessing import Pool
import time


def worker1(x):
    time.sleep(3) # emulate working on the problem
    return 9 # the solution

def worker2(x):
    time.sleep(1) # emulate working on the problem
    return 9 # the solution

def callback(answer):
    global solution
    # gets all the returned results from submitted tasks
    # since we are just interested in the first returned result, write it to the queue:
    solution = answer
    pool.terminate() # kill all tasks


if __name__ == '__main__':
    t = time.time()
    pool = Pool(2) # just two processes in the pool for demo purposes
    # submit two tasks:
    pool.apply_async(worker1, args=(1,), callback=callback)
    pool.apply_async(worker2, args=(2,), callback=callback)
    # wait for all tasks to terminate:
    pool.close()
    pool.join()
    print(solution)
    print('Total elapsed time:', time.time() - t)

印刷品:

9
Total elapsed time: 1.1378364562988281

方法2:通过队列

from multiprocessing import Pool
from queue import Queue
import time


def worker1(x):
    time.sleep(3) # emulate working on the problem
    return 9 # the solution

def worker2(x):
    time.sleep(1) # emulate working on the problem
    return 9 # the solution

def callback(solution):
    # gets all the returned results from submitted tasks
    # since we are just interested in the first returned result, write it to the queue:
    q.put_nowait(solution)


if __name__ == '__main__':
    t = time.time()
    q = Queue()
    pool = Pool(2) # just two processes in the pool for demo purposes
    # submit two tasks:
    pool.apply_async(worker1, args=(1,), callback=callback)
    pool.apply_async(worker2, args=(2,), callback=callback)
    # wait for first returned result from callback:
    solution = q.get()
    print(solution)
    pool.terminate() # kill all tasks in the pool
    print('Total elapsed time:', time.time() - t)

印刷品:

9
Total elapsed time: 1.1355643272399902

更新

即使在Windows下,与任务完成所需的时间相比,创建和重新创建池的时间可能相对较少,特别是对于以后的迭代,即n的较大值。如果调用的是同一个worker函数,那么第三种方法是使用pool方法imap_unordered。我还包括一些代码,用于测量我的桌面启动新池实例的开销:

from multiprocessing import Pool
import time


def worker(x):
    time.sleep(x) # emulate working on the problem
    return 9 # the solution


if __name__ == '__main__':
    # POOLSIZE = multiprocessing.cpu_count()
    POOLSIZE = 8 # on my desktop
    # how long does it take to start a pool of size 8?
    t1 = time.time()
    for i in range(16):
        pool = Pool(POOLSIZE)
        pool.terminate()
    t2 = time.time()
    print('Average pool creation time: ', (t2 - t1) / 16)

    # POOLSIZE number of calls:
    arguments = [7, 6, 1, 3, 4, 2, 9, 6]
    pool = Pool(POOLSIZE)
    t1 = time.time()
    results = pool.imap_unordered(worker, arguments)
    it = iter(results)
    first_solution = next(it)
    t2 = time.time()
    pool.terminate()
    print('Total elapsed time:', t2 - t1)
    print(first_solution)

印刷品:

Average pool creation time:  0.053139880299568176
Total elapsed time: 1.169790506362915
9

更新2

这是一个两难境地:您有多个进程处理一个拼图的各个部分。例如,当一个进程发现某个数字可被通过范围内的某个数字整除时,在其他进程中,从完成其测试开始测试不同范围内的数字是没有意义的。你可以做三件事中的一件。在开始下一次迭代之前,您可以什么都不做,让流程完成。但这会延迟下一次迭代。我已经建议您终止进程,这样可以释放处理器。但这需要你创建新的流程,而你发现这些流程并不令人满意

我只能想到另一种可能性,我在下面使用您的方法进行多处理。名为stop的多处理共享内存变量以每个进程作为全局变量进行初始化,并在每次迭代之前设置为0。当一个任务被设置为返回值0,并且在其他进程中运行的其他任务没有继续执行的意义时,它将stop的值设置为1。这意味着任务必须定期检查stop的值,如果该值已设置为1,则返回当然,这会给处理过程增加额外的周期。在下面的演示中,我实际上有100个任务排队等待8个处理器。但是,最后92个任务将立即发现stop已经设置,并且应该在第一次迭代时返回

顺便说一句:原始代码使用multiprocessing.JoinableQueue实例对任务进行排队,而不是使用multiprocessing.Queue实例,并在消息从队列中移除时在此实例上调用task_done。然而,在这个队列上从来没有调用过join(它会告诉您所有消息何时被删除),从而破坏了拥有这样一个队列的全部目的。事实上,不需要JoinableQueue,因为主进程已经提交了num_jobs个作业,并且在结果队列上期望num_jobs个消息,并且可以循环并拉取期望的n结果队列中的结果数。我用一个简单的Queue替换了JoinableQueue,保留了原始代码,但注释掉了。此外,Consumer进程可以创建为守护进程(参数为daemon=True),然后当所有非守护进程(即主进程)终止时,它们将自动终止,从而避免使用特殊的“毒丸”None任务消息。我已经做了更改,并且再次保持了原始代码的完整性,但是注释掉了以供比较

import multiprocessing


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue, stop):
        # make ourself a daemon process:
        multiprocessing.Process.__init__(self, daemon=True)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.stop = stop

    def run(self):
        global stop
        stop = self.stop
        while True:
            next_task = self.task_queue.get()
            """
            if next_task is None:
                # Poison pill shutdown of .get() loop with break
                #self.task_queue.task_done()
                break
            """
            answer = next_task()
            #self.task_queue.task_done()
            self.result_queue.put(answer)
        # return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        global stop
        # start the range from 1 to avoid dividing by 0:
        for i in range(1, self.b):
            # how frequently should this check be made?
            if stop.value == 1:
                return 0
            if self.a % i == 0:
                stop.value = 1
                return 0
        return 1


if __name__ == '__main__':
    # Establish communication queues
    #tasks = multiprocessing.JoinableQueue()
    tasks = multiprocessing.Queue()
    results = multiprocessing.Queue()

    # Number of consumers equal to system cpu_count
    num_consumers = multiprocessing.cpu_count()

    # Make a list of Consumer object process' ready to be opened.
    stop = multiprocessing.Value('i', 0)
    consumers = [ Consumer(tasks, results, stop) for i in range(num_consumers) ]

    for w in consumers:
        w.start()

    # Enqueue jobs for the Class Consumer process' run() while-loop to .get() a workload:
    # many more jobs than processes, but they will stop immediately once they check the value of stop.value:
    num_jobs = 100
    stop.value = 0 # make sure it is 0 before an iteration
    for i in range(num_jobs):
        tasks.put(Task(i, 100)) # Similar jobs would be reiterated before poison pill.

    # We start to .get() the results in a different loop-
    results = [results.get() for _ in range(num_jobs)]
    print(results)
    print(0 in results)

    """
    # Add a poison pill for each consumer
    for i in range(num_consumers): # We only do this when all computation is done.
        tasks.put(None) # Here we break all loops of open Consumer enqueue-able process'.
    """

印刷品:

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
True

我终于想出了一个解决办法

import multiprocessing


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue, state):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.state = state


    def run(self):
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                self.task_queue.task_done()
                break
            # answer = next_task() is where the Task object is being called.
            # Python runs on a line per line basis so it stops here until assigned.
            # Put if-else on the same line so it quits calling Task if state.is.set()
            answer = next_task() if self.state.is_set() is False else 0
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        for i in range(self.b):
            if self.a % i == 0:
                return 0
        return 1


def initialize(n_list, tasks, results, states):
    sum_list = []
    for i in range(cpu_cnt):
    tasks.put(Task(n_list[i], number))
    for _ in range(cpu_cnt):
        sum_list.append(int(results.get()))
        if 0 in sum_list:
            states.set()
    if 0 in sum_list:
        states.clear()
        return None
    else:
        states.clear()
        return number


if __name__ == '__main__':
    states = multiprocessing.Event() # ADD THIS BOOLEAN FLAG EVENT!
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    cpu_cnt = multiprocessing.cpu_count() 

    # Add states.Event() to Consumer argument list:
    consumers = [ Consumer(tasks, results, states) for i in range(cpu_cnt) ]

    for w in consumers:
        w.start()

    n_list = [x for x in range(1000)]
    iter_list = []
    for _ in range(1000):
        iter_list.append(initialize(n_list, tasks, results, states)


    for _ in range(num_jobs):
        result = results.get()
   

    for i in range(num_consumers):
        tasks.put(None) 

如果打开的使用者对象使用同一行上的If else语句为下一个_task()函数调用分配答案,则当设置state.Event()标志时,它将退出,因为它锁定到该行,直到排队的任务对象分配变量“answer”。这是一个伟大的解决办法!它使任务对象在消费者的while循环中可中断,消费者通过变量“answer”赋值运行任务对象。两个多星期后,我找到了一个可以加快速度的解决方案!我在代码的工作版本上测试了它,速度更快!使用这样的方法,可以无限期地打开多个进程,并通过Consumer object joinable Queue循环传递许多不同的任务对象,以极快的速度并行处理大量数据!这段代码使所有内核都像一个“超级内核”一样工作,其中所有进程都对每个内核保持开放,并为任何所需的迭代i/o流协同工作

以下是我的一个python多处理程序在8个超线程内核上实现此方法的示例输出:

Enter prime number FUNCTION:n+n-1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:100000
Progress: ########## 100%
Primes:
ƒ(2) = 3
ƒ(3) = 5
ƒ(4) = 7
ƒ(6) = 11
ƒ(7) = 13
ƒ(9) = 17

etc etc...

ƒ(99966) = 199931
ƒ(99967) = 199933
ƒ(99981) = 199961
ƒ(99984) = 199967
ƒ(100000) = 199999
Primes found: 17983
Prime at end of list has 6 digits.
Overall process took 1 minute and 2.5 seconds.

1至200000(除#2外)的所有17983个素数在~1分钟内达到全模数

在3990x128线程AMD Threadripper上,需要约8秒

以下是8个超线程内核上的另一个输出:

Enter prime number FUNCTION:((n*2)*(n**2)**2)+1
Enter the number for 'n' START:1
Enter the number of ITERATIONS:1000
Progress: ########## 100%
Primes:
ƒ(1) = 3
ƒ(3) = 487
ƒ(8) = 65537
    
    etc... etc...
    
ƒ(800) = 655360000000001
ƒ(839) = 831457011176399
ƒ(840) = 836423884800001
ƒ(858) = 929964638281537
ƒ(861) = 946336852720603
ƒ(884) = 1079670712526849
ƒ(891) = 1123100229130903
ƒ(921) = 1325342566697203
ƒ(953) = 1572151878119987
ƒ(959) = 1622269605897599
ƒ(983) = 1835682572370287
Primes found: 76
Prime at end of list has 16 digits.
Overall process took 1 minute and 10.6 seconds.

相关问题 更多 >