我对asyncio非常陌生,需要一些关于如何构建以下场景的建议。我有一个接受回调的Cython扩展名。前者在每次新事件到达时执行。但是,启动收集这些事件的机制是一个阻塞操作,即它阻塞了主线程。
Cython扩展还接受asyncio.Queue
,并从回调调用put_nowait
方法。现在我想为队列设置消费者来处理事件。这可能是场景背后的伪代码:
aioq = asyncio.Queue(1000)
cext = CythonExtension(aioq)
def c(aioq):
while not aioq.empty():
e = yield from aioq.get()
loop.create_task(c(aioq))
# i'm not sure how to run the event loop
# and keep on initializing the cython extension
# because this call also blocks...
#loop.run_forever()
# so i tried this.
loop.run_in_executor(None, cext.start) <- this is a blocking operation
# start the event loop
loop.run_forever()
当我运行这个示例时,asyncio
队列中充满了事件,但是c
任务从不执行—我无法从队列中获取任何事件。如果您有任何关于如何解决这个问题的反馈或建议,我们将不胜感激。你知道吗
在执行了一定数量的字节码操作之后,常规的Python代码将允许另一个线程运行。例如,参见this set of slides,它讨论了对机制的改进,但也解释了机制。你知道吗
Cython不执行字节码,因此从不触发这种交换机制,因此Cython线程将无限期地阻塞(如您所发现的)。一个简单的方法是将以下行添加到
cext.start
中的主Cython循环中(因此它会定期执行):这将释放,然后立即尝试回收全局解释器锁(GIL),它允许其他线程执行(如果另一个线程执行,那么Cython将不得不等待)。你知道吗
更好的选择是识别不需要GIL的Cython代码位(主要是使用C数据类型的位,而不是Python数据类型的位),并将这些位包装在
with nogil:
块中。这将允许你的Cython代码继续做一些有用的事情,而另一个线程也在运行。你知道吗相关问题 更多 >
编程相关推荐