你如何对芹菜进行单元测试?

2024-09-27 01:22:29 发布

您现在位置:Python中文网/ 问答频道 /正文

芹菜文档mentions testing Celery within Django但是没有解释如果不使用Django,如何测试芹菜任务。你怎么做到的?


Tags: django文档testingcelery芹菜mentionswithin
3条回答

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

文件:http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

芹菜总是让你的任务同步运行,你不需要芹菜服务器。

取决于你到底想测试什么。

  • 直接测试任务代码。不要调用“task.delay(…)”只需从单元测试中调用“task(…)”。
  • 使用CELERY_ALWAYS_EAGER。这将导致在您说“task.delay(…)”时立即调用您的任务,因此您可以测试整个路径(但不能测试任何异步行为)。

可以使用任何unittest库同步测试任务。我通常在处理芹菜任务时做两个不同的测试。第一个(我建议如下)是完全同步的,应该是确保算法完成它应该做的事情的那个。第二个会话使用整个系统(包括代理),并确保我没有序列化问题或任何其他分发、通信问题。

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

你的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

希望能有帮助!

相关问题 更多 >

    热门问题