如何使芹菜任务调用异步任务?

2024-10-01 11:27:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个django应用程序,需要运行一个优化算法。该算法由两部分组成。第一部分是进化算法,该算法调用一定数量的任务,第二部分是模拟退火算法。 问题是芹菜不允许任务调用异步任务。 我尝试了以下代码:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

            job = group(sa_list)

            result = job.apply_async()
            resultados = result.get()

这段代码是进化算法的一部分,这是一个芹菜任务。 当我试着运行它时,芹菜会显示以下信息:

[2015-12-02 16:20:15,970: WARNING/Worker-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task! See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being raised instead of just being a warning.

尽管只是一个警告,芹菜似乎充满了任务和锁。在

我寻找了很多解决办法,但没有一个奏效。在


Tags: django代码inself算法taskgetsa
2条回答

解决这一问题的一种方法是采用两级管道:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

    job = group(sa_list)

    result = job.apply_async()
    result.save()
    return result.id

那就这样说吧:

^{pr2}$

还有其他方法需要更多的工作——你可以用和弦来收集小组的结果。在

问题不在于celery不允许在您的示例中执行异步任务,而是您将遇到死锁,因此警告:

假设您有一个任务a,它通过apply_async()生成许多子任务B。这些任务中的每一个都是由一个工人执行的。问题是,如果任务B的数量大于可用工作线程的数量,任务A仍然在等待它们的结果(至少在您的示例中,它不是默认的)。当任务A仍在运行时,已执行任务B的工人将而不是执行另一个任务,它们被阻塞,直到任务A完成。(我不知道确切原因,但就在几周前我遇到了这个问题。)

这意味着在手动关闭工人之前,芹菜不能执行任何操作。在

解决方案

这完全取决于您将如何处理任务结果。如果您需要它们来执行下一个子任务,您可以通过Linking with callbacks或将其硬编码到相应的任务中(这样您就可以调用第一个子任务,而调用第二个子任务,依此类推)。在

如果只需要查看它们是否被执行以及是否成功,可以使用flower监视任务。在

如果您需要进一步处理所有子任务的输出,我建议将结果写入一个xml文件:让任务A调用所有任务B,完成后执行任务C来处理结果。也许还有更优雅的解决方案,但这肯定避免了僵局。在

相关问题 更多 >