我得到了Google云存储桶的URL。我必须:
使用URL获取该bucket中blob的列表
对于每个blob,我调用一些gcsapi来获取关于blob的信息(水滴大小, blob.名称等等)
对于每个blob,我还必须读取它,在其中找到一些内容并将其添加到从GCS API调用获得的值中
对于每个blob,我必须将步骤2和3中关于blob的值写入BigQuery
我有数千个blob,所以这需要用apachebeam来完成(我被推荐)
我对管道的想法是这样的:
获取UrlOfBucket并进行PCollection
使用该PCollection获取blob列表作为新PCollection
使用这些blob的元数据创建PCollection
执行一个转换,该转换将接受PCollection,该PCollection是元数据值的字典,进入blob,扫描一个值并返回一个新的PCollection,该PCollection是元数据值和这个新值的字典
把这个写到BigQuery。在
我很难再去查字典了
[+]我读到的:
https://beam.apache.org/documentation/programming-guide/#composite-transforms
https://medium.com/@rajeshhegde/data-pipeline-using-apache-beam-python-sdk-on-dataflow-6bb8550bf366
任何建议,特别是关于如何接受bucket名称并返回blob的PCollection的建议,都将受到极大的欢迎。在
我通过阅读更多关于apachebeam的文章来解决这个问题,并发现我必须使用ParDo函数在我的资源之间分配作业,在ParDo中,我调用DoFn函数,该函数接受一个元素,并完成它所需的所有处理并生成dic。 参考这篇文章Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?
相关问题 更多 >
编程相关推荐