Python Luigi:如何创建外部依赖/目标?

2024-10-01 09:22:15 发布

您现在位置:Python中文网/ 问答频道 /正文

在Luigi中,如何检查外部资源(在我的示例中是elasticsearch索引,但它可以是任何东西)是否存在,如果不存在,如何创建它?我的问题是,任务总是必须生成一个输出文件,否则它们将被视为已完成,不会运行。你知道吗

这里的https://github.com/spotify/luigi/issues/595看起来像是黑客攻击,我的类中没有run()-方法(它是由父类CopyToIndex实现的):

class UpdateIndex(CopyToIndex):
    source: str = Parameter(default='')

    iteration = luigi.IntParameter()

    def requires(self):
        return FilterDataset(self.source)

    def docs(self):
        file_path: str = os.path.join('tmp', '{0}_filtered_output.ldj'.format(self.source))
        file_contents = open(file_path, 'r').read()
        return [json.loads(str(item)) for item in file_contents.strip().split('\n')]

    # properties of parent class CopyToIndex:
    host = 'localhost'
    port = 9200
    index = 'example'
    doc_type = '_doc'
    purge_existing_index = True
    marker_index_hist_size = 1
    mapping = {
        'properties': {
            'name': {'type': 'keyword'},
            'name_suggest': {
                'type': 'completion',
            }
        }
    }
    settings = {
        'number_of_shards': 1,
        'number_of_replicas': 0
    }

Tags: ofpathselfsourceindexreturndeftype