如何发布芹菜状态与一个可观察的?

2024-10-16 20:51:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在构建一个进度条应用程序,它使用芹菜来执行异步任务。任务以GraphQL突变开始,并每秒更新自己的状态:

@celery.task(bind=True, name='mock_analyzing')
def mock_analyzing(self, up_to=20):
    for i in range(up_to):
        self.update_state(
            state='PROGRESS',
            meta={
                'done': i,
                'total': up_to
            }
        )
        time.sleep(1)
    return 'SUCCESS'

当我想用GraphQL订阅发布该状态时,属性设置不正确。我做错什么了

class Subscription(graphene.ObjectType):
    file_status = graphene.String(file_id=graphene.ID())

    def resolve_file_status(root, info, file_id):
        task = mock_analyzing.AsyncResult(file_id)
        return Observable.interval(1000)\
                         .map(lambda done = task.info.get('done'), total = task.info.get('total'): {id: file_id, 'done': done, 'total': total})\
                         .take_while(lambda i: not task.ready())

我想我没有理解可观察的权利。有人能帮我吗?提前谢谢


Tags: toselfinfoidtask状态defmock