为apache_beam Python SDK版本>2.24实现自定义编码器

2024-09-29 23:30:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在使用apache_beam sdk for python编写数据工程。我用的是2.24版本。我在将apache_beam版本升级到2.31时创建了一个自定义编码器类,但它有一些问题。自定义编码器类名为IgnoreUnicode。 这是我的管道代码:

branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
                            | 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
                            | 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
                            | 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
                            | 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
    )

这是IgnoreUnicode类,我用来覆盖apache_beam中的默认编码器:

# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
    def encode(self, value):
        return value.encode('utf-8','ignore')

    def decode(self, value):
        return value.decode('utf-8','ignore')

    def is_deterministic(self):
        return True

这些代码适用于apache_beam 2.24版。但是,如果我将其升级到2.24以上的版本,则会出现如下错误(在本例中,我使用的是2.31版本):

enter image description here

在2.24以上版本中,是否有其他解决方案可以实现自定义编码器


Tags: lambda代码self版本branchmapreturnvalue
1条回答
网友
1楼 · 发布于 2024-09-29 23:30:58

看起来这是一种不幸的组合,它将源的重组方式与在__main__中定义PCoder结合在一起。我建议两种解决方法中的一种:

(1)将IgnoreUnicode的定义移动到导入的适当模块,而不是__main__,或者

(2)使用ByteCoder读取文件,然后使用

`beam.Map(lambda line: line.decode('utf-8','ignore'))`.

(就个人而言,我更喜欢后者,因为最好不要让编码人员对数据进行变异。)

相关问题 更多 >

    热门问题