<p>您可以使用下面的代码来对连接的右侧使用side输入,假设右侧总是将一个元素映射到每个键,这意味着它的大小总是比左pcollection小得多。在</p>
<p>另外,如果您的pcollection是通过从外部源读取而不是从内存中的数组中读取而创建的,则需要将<code>right_list=beam.pvalue.asList(pcoll2)</code>而不是{<cd2>}传递给ParDo。有关详细信息,请查看<a href="https://stackoverflow.com/a/48278102/3648318">here</a></p>
<pre><code>class LeftJoinerFn(beam.DoFn):
def __init__(self):
super(LeftJoinerFn, self).__init__()
def process(self, row, **kwargs):
right_dict = dict(kwargs['right_list'])
left_key = row[0]
if left_key in right_dict:
for each in row[1]:
yield each + right_dict[left_key]
else:
for each in row[1]:
yield each
class Display(beam.DoFn):
def process(self, element):
LOG.info(str(element))
yield element
p = beam.Pipeline(options=pipeline_options)
pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), \
('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), \
('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])\
]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]
left_joined = (
pcoll1
| 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn(), right_list=pcoll2)
| 'Display' >> beam.ParDo(Display())
)
p.run()
</code></pre>