在apachebeam管道中使用MatchFiles()获取文件名并解析python中的json

2024-04-27 17:12:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我在一个bucket中有很多json文件,我想使用python3获取文件名,然后创建一个文件的键值对并读取它们。Match files现在在python上运行,但我想知道如何实现这一点:

files = p | fileio.MatchFiles("gs://mybuckenumerate/*.json") 
    | #Ideally want to create a tuple of filename, json row which I will pass into a ParDo that is a custom class that parses the json

我们的目标是假设我在一个桶里有10个文件:

^{pr2}$

而bucket中的json文件都共享相同的结构

我将它传递到自定义ParseFile类(我想通过ParDo,我的apachebeam知识有限),对于json中的每一行,我都输出一个字典(我将把它保存到一个以换行分隔的json中),其中一个键是文件名。在

编辑9/24上午11:15太平洋标准时间:以下是我尝试的方法

file_content_pairs = (p 
                | fileio.MatchFiles(known_args.input_bucket)
                | fileio.ReadMatches()
                | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8())))
                | beam.ParDo(TestThis())
                )

TestThis()只是用来打印内容:

class TestThis(beam.DoFn):

    def process(self, element):
        print(element)
        print("stop")
        yield element

但我在我的输出中看到的是:信息:根目录:在1.2762866020202637秒内完成了2个文件的清单。在


Tags: 文件jsonthatbucket文件名fileselementclass
1条回答
网友
1楼 · 发布于 2024-04-27 17:12:47

我不明白。您想拥有(filename, json-parsed-contents)的键值对吗?在

如果是这样,您将:

file_content_pairs = (
  p | fileio.MatchFiles("gs://mybucketname/*.json")
    | fileio.ReadMatches()
    | beam.Map(lambda file: (file.metadata.path, json.loads(file.read_utf8()))
)

所以,如果你的文件是这样的:

^{pr2}$

然后,file_content_pairs集合将包含键值对("myfile.json", {"a":"b", "c": "d", "e": 1})。在


如果您的文件是json行格式,则应执行以下操作:

def consume_file(f):
  other_name = query_bigquery(f.metadata.path)
  return [(other_name, json.loads(line))
          for line in f.read_utf8().strip().split('\n')]

with Pipeline() as p:
  result = (p
            | fileio.MatchFiles("gs://mybucketname/*.json")
            | fileio.ReadMatches()
            | beam.FlatMap(consume_file))

相关问题 更多 >