擅长:python、mysql、java
<p>由Marvin Killing建议的<code>async_test</code>绝对可以帮助——以及直接调用<code>loop.run_until_complete()</code></p>
<p>但我也强烈建议为每个测试重新创建新的事件循环,并直接将循环传递给API调用(至少<code>asyncio</code>本身对每个需要它的调用都只接受<code>loop</code>关键字参数)。</p>
<p>就像</p>
<pre><code>class Test(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
def test_xxx(self):
@asyncio.coroutine
def go():
reader, writer = yield from asyncio.open_connection(
'127.0.0.1', 8888, loop=self.loop)
yield from asyncio.sleep(0.01, loop=self.loop)
self.loop.run_until_complete(go())
</code></pre>
<p>它隔离测试用例中的测试,并防止出现奇怪的错误,如在<code>test_a</code>中创建但仅在<code>test_b</code>执行时完成的长期协同路由。</p>