<p>要使基于异步协同程序的代码库从传统的同步代码库中可用,没有一条万能的道路。你必须根据代码路径做出选择。在</p>
<p>从一系列工具中选择:</p>
<h2>使用<code>async.run()</code>的同步版本</h2>
<p>为协程提供同步包装,在协程完成之前会阻塞。在</p>
<p>即使是异步生成器函数,如<code>ticker()</code>,也可以这样在循环中处理:</p>
<pre><code>class UselessExample:
def __init__(self, delay):
self.delay = delay
async def a_ticker(self, to):
for i in range(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
agen = self.a_ticker(to)
try:
while True:
yield asyncio.run(agen.__anext__())
except StopAsyncIteration:
return
</code></pre>
<p>可以使用helper函数生成这些同步包装:</p>
^{pr2}$
<p>然后在类定义中使用<code>ticker = sync_agen_method(a_ticker)</code>。在</p>
<p>直接向上的协同程序方法(不是生成器协同程序)可以用以下方式包装:</p>
<pre><code>def sync_method(async_method):
@wraps(async_method)
def wrapper(self, *args, **kwargs):
return async.run(async_method(self, *args, **kwargs))
if wrapper.__name__[:2] == 'a_':
wrapper.__name__ = wrapper.__name__[2:]
return wrapper
</code></pre>
<h2>排除常见成分</h2>
<p>将同步部分重构成生成器、上下文管理器、实用程序函数等</p>
<p>对于您的特定示例,将<code>for</code>循环拉出到单独的生成器中,可以将重复代码减到两个版本休眠的状态:</p>
<pre><code>class UselessExample:
def __init__(self, delay):
self.delay = delay
def _ticker_gen(self, to):
yield from range(to)
async def a_ticker(self, to):
for i in self._ticker_gen(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
for i in self._ticker_gen(to):
yield i
sleep(self.delay)
</code></pre>
<p>虽然这没什么区别,但它可以在其他环境下工作。在</p>
<h2>抽象语法树转换</h2>
<p>使用AST重写和映射将协程转换为同步代码。如果您不小心识别诸如<code>asyncio.sleep()</code>vs<code>time.sleep()</code>之类的实用函数,这可能会非常脆弱:</p>
<pre><code>import inspect
import ast
import copy
import textwrap
import time
asynciomap = {
# asyncio function to (additional globals, replacement source) tuples
"sleep": ({"time": time}, "time.sleep")
}
class AsyncToSync(ast.NodeTransformer):
def __init__(self):
self.globals = {}
def visit_AsyncFunctionDef(self, node):
return ast.copy_location(
ast.FunctionDef(
node.name,
self.visit(node.args),
[self.visit(stmt) for stmt in node.body],
[self.visit(stmt) for stmt in node.decorator_list],
node.returns and ast.visit(node.returns),
),
node,
)
def visit_Await(self, node):
return self.visit(node.value)
def visit_Attribute(self, node):
if (
isinstance(node.value, ast.Name)
and isinstance(node.value.ctx, ast.Load)
and node.value.id == "asyncio"
and node.attr in asynciomap
):
g, replacement = asynciomap[node.attr]
self.globals.update(g)
return ast.copy_location(
ast.parse(replacement, mode="eval").body,
node
)
return node
def transform_sync(f):
filename = inspect.getfile(f)
lines, lineno = inspect.getsourcelines(f)
ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
ast.increment_lineno(ast_tree, lineno - 1)
transformer = AsyncToSync()
transformer.visit(ast_tree)
tranformed_globals = {**f.__globals__, **transformer.globals}
exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
return tranformed_globals[f.__name__]
</code></pre>
<p>虽然上述内容可能还远远不够完整,无法满足所有需求,而且转换AST树<em>可能会使</em>望而生畏,但是上面的内容可以让您只维护异步版本并将该版本直接映射到同步版本:</p>
<pre><code>>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../example.py", line 32, in main
func(ue)
File "/.../example.py", line 21, in func
for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4
</code></pre>