重写结果序列化程序芹菜弦

2024-09-29 06:22:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用芹菜和弦来构造大型文档页面内容的并行AI处理。因为这是一个没有公共签名的一次性函数,所以我正在清理对象以进行分发和重新聚合。处理单个页面的任务是成功读取参数并执行所需的函数。但是,它尝试将结果返回到队列以进行后续聚合时失败

有人知道为一个名为via Chord的任务指定结果序列化程序的方法吗

和弦生成--

callback = processPageResults.subtask(kwargs={'cdd_id' : cdoc.cdd_id,'user_id':user.id},options={'serializer':'pickle'})

res = chord([processPage.s(useBold,
       docPages[i]).set(serializer='pickle') for i in range(0, len(docPages))], callback)()

调用任务--- @共享任务(序列化程序='pickle',结果序列化程序='pickle',bind=True,最大重试次数=20次) def processPage(自身,*参数): useBold=args[0] page=args[1] page.processWords(使用粗体) 返回页

错误--

kombu.exceptions.EncodeError: Object of type DocumentPage is not JSON serializable

Tags: 函数程序id参数序列化callbackargs页面
2条回答

在重温这一点之后,我为我的用例找到了一个解决方案。对我来说,序列化程序的结果是错误的概念。因为芹菜会自动序列化传递给task的参数,所以我只需将回调的task_序列化程序设置为“pickle”,就解决了我的问题。似乎对于一组链接的任务,result_序列化程序并没有真正的用处

理想情况下,您可以在签名中设置result_序列化程序,该签名适用于正常任务

就和弦而言,它对我来说不太好。但另一方面,您可以更新芹菜全局配置,它将起作用

我试图报告这是一个错误。让我们看看

celery = Celery("app_name", backend=result_backend, broker=broker_url)

celery.conf.update(
    result_serializer='pickle',        
)

相关问题 更多 >