<p>具体地说,要为每个键编写几个元素,可以执行以下操作</p>
<pre><code>class WriteByKey(apache_beam.DoFn):
def process(self, kvs):
# All values with the same key will come in at once.
key, values = kvs
with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'w') as fp:
for value in values:
fp.write(value)
fp.write('\n')
with beam.Pipeline() as pipeline:
events = (
pipeline
| ...
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser())
)
output = events | beam.GroupByKey() | beam.ParDo(WriteByKey())
</code></pre>
<p>请注意,运行程序可能需要在失败时重试元素,因此更安全的方法不是直接写入输出,而是写入临时文件,然后在成功时自动重命名它,例如</p>
<pre><code>class WriteByKey(apache_beam.DoFn):
def process(self, kvs):
# All values with the same key will come in at once.
key, values = kvs
nonce = random.randint(1, 1e9)
path = f'gs://bucket/path/{key}.extension'
temp_path = f'{path}-{nonce}'
with beam.io.gcp.gcsio.GcsIO().open(temp_path, 'w') as fp:
for value in values:
fp.write(value)
fp.write('\n')
beam.io.gcp.gcsio.GcsIO().rename(temp_path, path)
</code></pre>