如何在python中实现自定义的多处理连续(异步)控制?

2024-07-04 13:16:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两种浏览器类型:firefox(40个实例)和chrome(40个实例)在我的selenium网格上。另外,我有很多测试,其中一些需要在firefox上执行,有些需要在chrome下执行。有些人不在乎。在

@skrrgwasme建议的第一个解决方案是将不需要特定浏览器的测试分为两组,最后有两个队列(一个在firefox上执行,第二个在chrome上执行):How to implement custom control over python multiprocessing.Pool?

这个解决方案很好,但仍然可以增强:当浏览器以不同的速度处理请求时,chrome将更快地完成队列,而firefox队列的工作时间也会更长。这可以通过自定义连续控制来解决,当我决定使用哪种浏览器时,而不是在创建池之前,而是在池已经启动时。在

所以,我们应该有一个池,其中每个进程的生成都由我们控制。在

@skrrgwasme建议使用apply_async。但我不明白在python中如何实现这一点,因为它不是异步的节点.js. 在

你能分享一些例子吗?我对python有很小的经验,似乎完全被这个问题所困扰:(


Tags: to实例网格类型队列seleniumcustom浏览器
2条回答

我认为这里最简单的事情是让每个工作进程使用两个Queue对象。一个是特定于浏览器的Queue,另一个是共享的“generic”Queue。这样,您可以让40个进程从Chrome Queue中消耗,然后在耗尽后切换到通用Queue,也可以让40个进程从FirefoxQueue消耗,然后在耗尽后切换到通用的Queue。下面是一个使用8个进程而不是80个进程的示例:

from multiprocessing import Pool, Manager
from Queue import Empty
import time

ff_tests = [1,2,3,4,5]
chrome_tests = [10, 11, 12, 13, 14, 15]
general_tests = [20, 21,22, 23,24,25]

def process_func(spec_queue, general_queue, browser):
    while True:
        try:
            test = spec_queue.get_nowait()
            print("Processing {} in {} process".format(test, browser))
            time.sleep(2)
        except Empty:
            break

    while True:
        try:
            test = general_queue.get_nowait()
            print("Processing {} in {} process".format(test, browser))
            time.sleep(2)
        except Empty:
            break


if __name__ == "__main__":
    m = Manager()
    ff_queue = m.Queue()
    chrome_queue = m.Queue()
    general_queue = m.Queue()

    for queue, tests in [(ff_queue, ff_tests), (chrome_queue, chrome_tests),
                         (general_queue, general_tests)]:
        for test in tests:
            queue.put(test)


    pool = Pool(8)
    for _ in range(4):
        pool.apply_async(process_func, args=(ff_queue, general_queue, "firefox"))
        pool.apply_async(process_func, args=(chrome_queue, general_queue, "chrome"))
    pool.close()
    pool.join()

输出:

^{pr2}$

如您所见,特定于浏览器的队列在其特定于浏览器的进程中被耗尽,然后这两种类型的进程协同工作以排出通用队列。在

使用pool.apply_async可以被认为非常类似于在前面的问题中手动设置map在幕后进行的每个调用。您只需将所有任务添加到池中,当新的工作进程可用时,它就会快速处理这些任务。您可以使用与skrrgwasme suggested相同的每个浏览器一个函数的方法。下面的代码大量引用了他对前一个问题的回答:

from multiprocessing import Pool

params = [1,2,3,4,5 ... ]

def ff_func(param):
    # Do FireFox stuff

def ch_func(param):
    # Do Chrome stuff

pool = Pool(80)

# For each parameter, add two tasks to the pool one FF, one Chrome.
for param in params:
    pool.apply_async(ff_func, param)
    pool.apply_async(ch_func, param)

pool.close()
pool.join()

这里的情况是,您构建了一个大的异步任务队列供池处理。然后,池按照它认为合适的顺序处理所有定义的任务。在

请注意,与前面的答案不同,这实际上不能保证每个浏览器的最大池大小为40,因为您要求我们更好地利用我们的资源。使用我们最多80个进程的最佳方法是尽可能让它们一直工作。在

如果不能同时为这两种“类型”中的任何一种使用超过40个进程,那么就无法从以前开始真正改进双池方法。在这种情况下,您的瓶颈仅仅是40个进程完成一个或另一个队列的速度。如果不允许使用更快队列中的空闲进程,则无法使用它们;-)

相关问题 更多 >

    热门问题