2024-09-27 20:16:41 发布
网友
我正在用ApacheBeam实现一个批处理管道,它解压缩json文件,对它们进行预处理,并将它们存储回文件系统中的给定位置
可以使用ZIP或GZIP算法压缩文件
解压在GZIP文件上运行良好,但在ZIP文件上失败。。。在调查之后,我发现只有GZIP、BZIP2和DEFLATE压缩类型在javasdk中才受支持,但不存在python实现
在不修补apache beam Python SDK的情况下,是否有解决此问题的方法
Beam Python不支持ZIP。有两种解决方法:您可以读取DoFn中的文件,或者通过Cross-Language Transform使用JavaSDK的文件IO
通过dofn读取的方法看起来像
filenames | beam.Map(lambda f: (f, None)) | beam.GroupByKey() # The GroupByKey adds a fusion break so that files can be processed in parallel | beam.Map(lambda f: f[0]) | beam.FlatMap(lambda f: [line for line in read(f)]
Beam Python不支持ZIP。有两种解决方法:您可以读取DoFn中的文件,或者通过Cross-Language Transform使用JavaSDK的文件IO
通过dofn读取的方法看起来像
相关问题 更多 >
编程相关推荐