我一直在编写一个数据流管道,并且正在使用flex模板
我的代码从avro读取并处理它没有问题。但是当涉及到WriteToAvro或WriteToText时,数据流作业失败,并且看起来在模板验证时失败了。我完全没有理由这样做
我试过很多东西。删除输出文件的参数并对其进行硬编码。将WriteToAvro切换为WriteToText,但同样失败
with beam.Pipeline(options=options) as p:
read_from_avro = p \
| 'ReadFromAvro' >> ReadFromAvro(input_file)
redact_data = read_from_avro | "RedactData" >> IdentifyRedactData(project, redact_fields)
redact_data | 'WriteToAvro' >> WriteToAvro(
file_path_prefix=output_file,
schema=s,
codec='deflate',
file_name_suffix='.avro')
join_pcollections
的输出是一个pcollection,每个元素都是一个dictionary
数据流日志提供以下信息:
2021-06-27 09:04:46.728 BST Workflow failed.
2021-06-27 09:04:46.763 BST Cleaning up.
2021-06-27 09:04:46.817 BST Worker pool stopped.
有人知道发生了什么吗。仅供参考当我删除最后一个步骤并运行“ProcessData”步骤时,一切都会顺利运行。这是刚刚中断的最后一个写入步骤
编辑以添加需求文件
apache-beam==2.29.0
google-cloud==0.34.0
google-cloud-dlp==3.1.0
google-cloud-storage==1.35.0
google-cloud-core==1.4.1
google-cloud-datastore==1.15.0
如果我尝试使用apachebeam[gcp]==2.29.0,构建就会失败,因此我想知道这是否与此有关
apache-beam[gcp] 2.29.0 depends on google-cloud-dlp<2 and >=0.12.0; extra == "gcp"
固定的。我认为问题源于管道选项配置不当。我还根据FlexWordCount示例更改了管道的运行方式
从作业详细信息中,您可以导航到云日志记录。显示的默认日志集可能不包含错误,因此我建议更改过滤器以显示所有日志
相关问题 更多 >
编程相关推荐