芹菜块大数据

2024-09-20 22:58:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试使用celeri的chunks功能将iterable数据集分成多个部分,然后将其发送到celery任务进行进一步处理。 我有一个查询集,是通过调用以下sqlalchemy得到的

query_set = MyModel.query.join(OtherModel).all())

目前,查询集是元组的列表。查询结果的长度为40000个,并且还在增长。在

我有另一个函数(celery task),它处理查询集中的数据,其定义是

^{pr2}$

因为查询集是元组的列表,所以我想可以像这样直接将它传递给crunch_qs

crunched_qs = crunch_qs.chunks(query_set, 5000)()
results = crunched_qs.get()

那没用。这给了我一个意想不到的结果。它正在解包每个查询集元组中的项并将它们发送到crunch_qs。 因此crunch_qs将在第一次迭代时收到**查询集[0],这引发了以下错误

TypeError:crunch_qs()只接受1个参数(给定10个)

len(查询集[0])=10

我也试过。。在

crunched_qs = crunch_qs.chunks((row,) for row in query_set, 5000)()
results = crunched_qs.get()

效果好一点。打字错误消失了。但是,我的crunch_qs函数现在将每一行(tuple)作为参数,而不是长度为5000的元组列表。在

如果有任何关于如何将元组列表传递给芹菜块的帮助/想法,我们将不胜感激。在

提前谢谢


Tags: 数据函数列表get错误queryresultschunks

热门问题