我被一个相对复杂的芹菜链配置所困扰,试图实现以下目标。假设有一系列任务,如下所示:
chain1 = chain(
DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name
UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name
ParseFile.s(), # parses file, returns list URLs to download
)
现在我想并行下载每个URL,所以我所做的是:
^{pr2}$最后,我想获取从ParseFile
返回的每个下载文件(从DownloadFile
返回一个临时文件名),并并行地在另一个任务链中运行它(例如,它将是chain
的group
):
chains = []
for tmpfile in res:
chains.append(celery.chain(
foo.s(tmpfile),
bar.s(),
baz.s()
))
res2 = celery.group(*chains)()
res2_data = res2.get()
如果我在一个普通的Python进程(而不是另一个celry任务)中运行它,该方法可以正常工作,因为我可以等待来自chain1
的结果,然后为每个下载的文件构造下载任务组和新链。在
然而,现在我想把所有这些东西打包到另一个Cerry任务中,方法是将它包装在另一个@app.task
修饰函数中,结果发现您不能(或者确实不应该从任务内部调用.get()
)等待另一个任务的完成,我没能找到一个解决方案“移植”这个工作流程在一个任务中运行。我试图将res1
添加到chain1
链中,但是芹菜抱怨{
有谁能给我个建议吗?谢谢!在
实际上,在任务中调用
.get()
是不好的。目标Celery
是并行执行异步任务,因此您不应该等待结果。在解决问题的一种方法是将第一次处理的url结果存储在文件中或数据库中。在
我写了一个简短的例子来说明如何通过将结果写入文件。我选择了
json
转储。在假设您的
urls
中有一个urls
列表。首先,启动异步处理所有那些group
为chain
的url。 所有这些任务都将处理url并将要下载的url列表存储在指定tmp目录中的文件中。在然后还启动
check_dir
任务,该任务将检查目录中是否已写入文件,在这种情况下,处理每个文件并删除tmp目录中的相应文件。在使用我选择的参数,这个任务每隔30秒自动回复一次,而且永远不会结束(我想你有一个当前的任务要执行),所以你可以改变它,但它是给你一个如何管理的想法。在
我将其作为
main
运行,但如果您愿意,也可以将其打包到另一个芹菜任务中。在应用程序_模块.py在
在任务.py在
^{pr2}$在主.py在
控制台输出:
相关问题 更多 >
编程相关推荐