擅长:python、mysql、java
<p>我发现最干净的方法是将<code>async</code>函数包装在<code>asgiref.sync.async_to_sync</code>(来自<a href="https://github.com/django/asgiref/" rel="nofollow noreferrer">^{<cd3>}</a>)中:</p>
<pre><code>from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
</code></pre>
<p>我从我写的<a href="https://johnfraney.ca/2018/12/20/writing-unit-tests-celery-tasks-async-functions/" rel="nofollow noreferrer">blog post</a>中提取了这个例子。</p>