多进程Luigi tas中的请求

2024-10-01 09:18:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的Luigi Elasticsearch indexing task,它使用Requests生成GET,并将响应推送到本地ElasticSearch。另外,我还做了第二个任务,它会调用前几次,就像这样:

import luigi
import requests
from luigi.contrib.esindex import CopyToIndex


class RequestTask(CopyToIndex):
    TEST_URL = 'http://www.this-page-intentionally-left-blank.org'
    index = 'example_index'
    iteration = luigi.IntParameter()

    def docs(self):
            res = requests.get(self.TEST_URL).content.decode('utf-8')
            return [{'response': res, 'iteration': self.iteration}]


class ManyRequests(luigi.Task):
    def requires(self):
        return [RequestTask(iteration) for iteration in range(0, 4)]

if __name__ == '__main__':
    luigi.run()

如果我在一个单线程中运行ManyRequests任务,工作正常。但是,如果我指定多个worker(例如--workers 4),进程将从Elasticsearch中引发TransportError(index_already_exists_exception),它们将无法正确完成。完成进程的数量是随机的,所以我假设这是由于在Elasticsearch数据库中写入的一些冲突造成的。我是否必须以不同的方式实现多个请求?在

任何帮助都将非常感激:)

这是我执行ManyRequests——workers 4时的控制台:

^{pr2}$

Tags: testimportselfurlindexdefreselasticsearch