apache梁中的左连接

2024-10-01 00:31:52 发布

您现在位置:Python中文网/ 问答频道 /正文

在apachebeam中,在Pcollection之后左连接哪个更好?在

pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]

预期输出为

^{pr2}$

我使用CoGroupByKey()和Pardo()实现了一个左joiner。在beampythonsdk中有没有其他实现左joiner的方法?在

left_joined = (
    {'left': pcoll1, 'right': pcoll2}
    | 'LeftJoiner: Combine' >> beam.CoGroupByKey()
    | 'LeftJoiner: ExtractValues' >> beam.Values()
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)


class LeftJoinerFn(beam.DoFn):

    def __init__(self):
        super(LeftJoinerFn, self).__init__()

    def process(self, row, **kwargs):

        left = row['left']
        right = row['right']

        if left and right:
            for each in left:
                yield each + right[0]

        elif left:
            for each in left:
                yield each

Tags: selfrightdefleftrowbeameachkey2
2条回答

您可以使用下面的代码来对连接的右侧使用side输入,假设右侧总是将一个元素映射到每个键,这意味着它的大小总是比左pcollection小得多。在

另外,如果您的pcollection是通过从外部源读取而不是从内存中的数组中读取而创建的,则需要将right_list=beam.pvalue.asList(pcoll2)而不是{}传递给ParDo。有关详细信息,请查看here

class LeftJoinerFn(beam.DoFn):

    def __init__(self):
        super(LeftJoinerFn, self).__init__()

    def process(self, row, **kwargs):

        right_dict = dict(kwargs['right_list'])
        left_key = row[0]

        if left_key in right_dict:
            for each in row[1]:
                yield each + right_dict[left_key]

        else:
            for each in row[1]:
                yield each

class Display(beam.DoFn):
    def process(self, element):
        LOG.info(str(element))
        yield element

p = beam.Pipeline(options=pipeline_options)

pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), \
        ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), \
        ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])\
        ]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]


left_joined = (
    pcoll1 
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn(), right_list=pcoll2)
    | 'Display' >> beam.ParDo(Display())
)
p.run()

如果第二个集合总是更小,另一种方法是使用side inputs。这将需要使右集合成为一个向所有工人广播的侧输入,然后编写一个ParDo来处理来自左集合的元素并在右集合中读取。在

相关问题 更多 >