我尝试使用celeri的chunks功能将iterable数据集分成多个部分,然后将其发送到celery任务进行进一步处理。 我有一个查询集,是通过调用以下sqlalchemy得到的
query_set = MyModel.query.join(OtherModel).all())
目前,查询集是元组的列表。查询结果的长度为40000个,并且还在增长。在
我有另一个函数(celery task),它处理查询集中的数据,其定义是
^{pr2}$因为查询集是元组的列表,所以我想可以像这样直接将它传递给crunch_qs
crunched_qs = crunch_qs.chunks(query_set, 5000)()
results = crunched_qs.get()
那没用。这给了我一个意想不到的结果。它正在解包每个查询集元组中的项并将它们发送到crunch_qs
。
因此crunch_qs
将在第一次迭代时收到**查询集[0],这引发了以下错误
TypeError:crunch_qs()只接受1个参数(给定10个)
len(查询集[0])=10
我也试过。。在
crunched_qs = crunch_qs.chunks((row,) for row in query_set, 5000)()
results = crunched_qs.get()
效果好一点。打字错误消失了。但是,我的crunch_qs
函数现在将每一行(tuple)作为参数,而不是长度为5000的元组列表。在
如果有任何关于如何将元组列表传递给芹菜块的帮助/想法,我们将不胜感激。在
提前谢谢
目前没有回答
相关问题 更多 >
编程相关推荐