擅长:python、mysql、java
<p>在代码中,您使用了多个输出。
这意味着将一个DoFn(ParDo)的输出连接到另一个DoFn,这对于整个管道来说是静态的</p>
<p>如果您希望根据所拥有的内容将数据转储到不同的文件中,则必须实现一个DoFn,以便进行写入</p>
<p>大概是这样的:</p>
<pre><code>class WriteByKey(apache_beam.DoFn):
def process(self, kv):
key, value = kv
with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'a') as fp:
fp.write(value)
</code></pre>
<p>您应该更改DataParser DoFn以生成一个元组(日期、值)而不是TaggedOut,并将管道更改为如下内容:</p>
<pre><code>with beam.Pipeline() as pipeline:
events = (
pipeline
| 'Sample Events' >> beam.Create([
{"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
])
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser()) | beam.ParDo(WriteByKey())
)
</code></pre>