将异步和多处理结合起来会有什么问题(如果有的话)?

2024-05-18 20:15:03 发布

您现在位置:Python中文网/ 问答频道 /正文

几乎每个人第一次看到Python中的线程时都意识到,GIL让那些真正想并行处理的人感到痛苦,或者至少给了他们一个机会。

我目前正在考虑实现类似于反应堆模式的东西。实际上,我想监听一个类线程上的传入套接字连接,当有人试图连接时,接受该连接并将其传递给另一个类线程进行处理。

我还不确定我会面临什么样的负担。我知道目前有一个2兆的上限,对传入的消息。从理论上讲,我们可以达到每秒数千次(尽管我不知道实际上我们是否见过这样的情况)。处理一条消息所花费的时间并不重要,尽管显然更快会更好。

我正在研究Reactor模式,并使用multiprocessing库开发了一个小示例,这个库(至少在测试中)似乎工作得很好。但是,现在/很快我们将有asyncio库可用,它将为我处理事件循环。

有没有什么东西可以把asynciomultiprocessing结合起来咬我?


Tags: asyncio消息时间模式情况理论线程multiprocessing
3条回答

您应该能够安全地组合asynciomultiprocessing而不会遇到太多麻烦,尽管您不应该直接使用multiprocessingasyncio(以及任何其他基于事件循环的异步框架)的基本sin正在阻塞事件循环。如果您尝试直接使用multiprocessing,则在任何时候阻塞等待子进程时,都将阻塞事件循环。显然,这很糟糕。

避免这种情况的最简单方法是使用^{}^{}中执行函数。ProcessPoolExecutor是一个使用multiprocessing.Process实现的进程池,但是asyncio内置了对在其中执行函数而不阻塞事件循环的支持。下面是一个简单的例子:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

在大多数情况下,这仅仅是功能就足够了。如果您发现自己需要来自multiprocessing的其他构造,如QueueEventManager等,那么有一个名为^{}(完全公开:我编写的)的第三方库,它提供所有multiprocessing数据结构的asyncio兼容版本。下面是一个例子:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

参见PEP 3156,特别是线程交互部分:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

这清楚地记录了您可能使用的新异步方法,包括run_in_executor()。注意,Executor是concurrent.futures中定义的,我建议您也查看一下。

是的,有很多可能(或不可能)咬你。

  • 当您运行类似asyncio的程序时,它希望在一个线程或进程上运行。这本身不适用于并行处理。您必须在将IO操作(特别是套接字上的操作)留在单个线程/进程中的同时,以某种方式分发工作。
  • 虽然将单个连接传递到不同处理程序进程的想法很好,但很难实现。第一个障碍是您需要一种方法,在不关闭连接的情况下将连接从asyncio中拔出。下一个障碍是,除非使用C扩展名中特定于平台(可能是Linux)的代码,否则不能简单地将文件描述符发送到其他进程。
  • 注意,multiprocessing模块已知会创建多个用于通信的线程。大多数情况下,当您使用通信结构(例如Queues)时,会产生一个线程。不幸的是,这些线并不是完全看不见的。例如,当你打算终止你的程序时,它们可能无法彻底地被拆掉,但是根据它们的数量,资源的使用可能是显而易见的。

如果您真的打算在各个过程中处理各个连接,我建议您检查不同的方法。例如,您可以将套接字置于侦听模式,然后同时接受来自多个并行工作进程的连接。一旦一个worker处理完一个请求,它就可以接受下一个连接,因此您仍然比为每个连接派生一个进程所用的资源少。例如,Spamassassin和Apache(mpm prefork)可以使用这个工作机模型。根据您的用例,它可能会变得更容易和更健壮。具体地说,您可以让您的工作人员在服务配置数量的请求后死亡,并由主进程重新启动,从而消除内存泄漏的许多负面影响。

相关问题 更多 >

    热门问题