我正在使用数据流管道读取一个Json文件beam.io.ReadFromText,当我将其输出传递给任何类(ParDo)时,它将成为元素。我想在我的类中使用这个json文件内容,我该怎么做?在
Json文件中的内容:
{"query": "select * from tablename", "Unit": "XX", "outputFileLocation": "gs://test-bucket/data.csv", "location": "US"}
这里我想在类query()中使用它的每个值,比如query、Unit、location和outputFileLocation:
^{pr2}$我的班级:
class Query(beam.DoFn):
def process(self, element):
# do something using content available in element
.........
我不认为在当前的IOs系统中是不可能的。 原因是多行json需要解析完整的文件来标识单个json块。如果我们在阅读时没有并行,这是可能的。但是,由于基于文件的IOs使用特定的分区逻辑和行分隔符在多个worker上并行运行,因此无法解析多行json。在
如果您有多个较小的文件,那么您可能可以分别读取这些文件并发出解析后的json。您可以进一步使用重组来均匀地为下游操作分配数据。在
管道看起来像这样。在
相关问题 更多 >
编程相关推荐