<p>无法在进程之间自动共享正在运行的协程,因为协程在拥有异步类的进程中的特定事件循环中运行。协同例程的状态不能被pickle,即使可以,它在事件循环的上下文之外也没有意义。在</p>
<p>您可以为您的异步类创建一个基于回调的适配器,每个协程方法都由一个基于回调的方法表示,该方法的语义是“开始做X,完成后调用这个函数”。如果回调是支持多处理的,则可以从其他进程调用这些操作。然后,您可以在每个<em>进程中启动一个事件循环,并在基于代理的回调调用上创建一个协程facade。在</p>
<p>例如,考虑一个普通的异步类:</p>
<pre><code>class Async:
async def repeat(self, n, s):
for i in range(n):
print(s, i, os.getpid())
await asyncio.sleep(.2)
return s
</code></pre>
<p>基于回调的适配器可以使用公共的<code>asyncio</code>API将<code>repeat</code>协同例程转换为JavaScript“callback hell”样式的经典异步函数:</p>
^{pr2}$
<p>(转换可以自动进行,上面手动编写的代码只是展示了概念。)</p>
<p><code>CallbackAdapter</code>可以注册为多处理,因此不同的进程可以通过多处理提供的代理来启动适配器的方法(因此也可以启动原始的异步协同程序)。这只要求作为<code>on_success</code>传递的回调是多处理友好的。在</p>
<p>最后一步,我们可以绕一圈,为基于回调的API()创建一个异步适配器,同时在另一个进程中启动一个事件循环,并使用asyncio和<code>async def</code>。适配器类的适配器将运行一个功能齐全的<code>repeat</code>协程,该协程有效地代理原始的<code>Async.repeat</code>协程,而不必尝试pickle协程状态。在</p>
<p>以下是上述方法的示例实现:</p>
<pre><code>import asyncio, multiprocessing.managers, threading, os
class Async:
# The async class we are bridging. This class is unaware of multiprocessing
# or of any of the code that follows.
async def repeat(self, n, s):
for i in range(n):
print(s, i, 'pid', os.getpid())
await asyncio.sleep(.2)
return s
def start_asyncio_thread():
# Since the manager controls the main thread, we have to spin up the event
# loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
# submit stuff to the loop.
setup_done = threading.Event()
loop = None
def loop_thread():
nonlocal loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
setup_done.set()
loop.run_forever()
threading.Thread(target=loop_thread).start()
setup_done.wait()
return loop
class CallbackAdapter:
_loop = None
# the callback adapter to the async class, also running in the
# worker process
def __init__(self, obj):
self._async = obj
if CallbackAdapter._loop is None:
CallbackAdapter._loop = start_asyncio_thread()
def repeat_start(self, n, s, on_success):
# Submit a coroutine to the event loop and obtain a Task/Future. This
# is normally done with loop.create_task, but repeat_start will be
# called from the main thread, owned by the multiprocessng manager,
# while the event loop will run in a separate thread.
future = asyncio.run_coroutine_threadsafe(
self._async.repeat(n, s), self._loop)
# Once the coroutine is done, notify the caller.
# We could propagate exceptions by accepting an additional on_error
# callback, and nesting fut.result() in a try/except that decides
# whether to call on_success or on_error.
future.add_done_callback(lambda _f: on_success(future.result()))
def remote_event_future(manager):
# Return a function/future pair that can be used to locally monitor an
# event in another process.
#
# The returned function and future have the following property: when the
# function is invoked, possibly in another process, the future completes.
# The function can be passed as a callback argument to a multiprocessing
# proxy object and therefore invoked by a different process.
loop = asyncio.get_event_loop()
result_pipe = manager.Queue()
future = loop.create_future()
def _wait_for_remote():
result = result_pipe.get()
loop.call_soon_threadsafe(future.set_result, result)
t = threading.Thread(target=_wait_for_remote)
t.start()
return result_pipe.put, future
class AsyncAdapter:
# The async adapter for a callback-based API, e.g. the CallbackAdapter.
# Designed to run in a different process and communicate to the callback
# adapter via a multiprocessing proxy.
def __init__(self, cb_proxy, manager):
self._cb = cb_proxy
self._manager = manager
async def repeat(self, n, s):
set_result, future = remote_event_future(self._manager)
self._cb.repeat_start(n, s, set_result)
return await future
class CommManager(multiprocessing.managers.SyncManager):
pass
CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)
def get_manager():
manager = CommManager()
manager.start()
return manager
def other_process(manager, cb_proxy):
print('other_process (pid %d)' % os.getpid())
aadapt = AsyncAdapter(cb_proxy, manager)
loop = asyncio.get_event_loop()
# Create two coroutines printing different messages, and gather their
# results.
results = loop.run_until_complete(asyncio.gather(
aadapt.repeat(3, 'message A'),
aadapt.repeat(2, 'message B')))
print('coroutine results (pid %d): %s' % (os.getpid(), results))
print('other_process (pid %d) done' % os.getpid())
def start_other_process(loop, manager, async_proxy):
cb_proxy = manager.CallbackAdapter(async_proxy)
other = multiprocessing.Process(target=other_process,
args=(manager, cb_proxy,))
other.start()
return other
def main():
loop = asyncio.get_event_loop()
manager = get_manager()
async_proxy = manager.Async()
# Create two external processes that drive coroutines in our event loop.
# Note that all messages are printed with the same PID.
start_other_process(loop, manager, async_proxy)
start_other_process(loop, manager, async_proxy)
loop.run_forever()
if __name__ == '__main__':
main()
</code></pre>
<p>代码在python3.5上正确运行,但在3.6和3.7上由于<a href="https://bugs.python.org/issue30256" rel="nofollow noreferrer">a bug in multiprocessing</a>而失败。在</p>