我是阿帕奇·比姆的新手。我的场景如下所示
我有多个json格式的事件。在每个事件中,event_time列指示该事件的创建时间,我使用event_time计算它们的创建日期。 我想把这些事件分别写在它们的日期分区下。我的代码是
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time
class EventFormatter(beam.DoFn):
def process(self, element, *args, **kwargs):
tmp_dict = {}
for i in range(len(element['properties'])):
tmp_dict['messageid'] = element['messageid']
tmp_dict['userid'] = element['userid']
tmp_dict['event_time'] = element['event_time']
tmp_dict['productid'] = element['properties'][i]['productid']
yield tmp_dict
class DateParser(beam.DoFn):
def process(self, element, *args, **kwargs):
key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
print(key, element)
yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)
with beam.Pipeline() as pipeline:
events = (
pipeline
| 'Sample Events' >> beam.Create([
{"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
])
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser())
)
output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))
我找不到应该如何完成格式块。当我运行代码打印结果时,它给出
('2020-08-27', {'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25', {'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25', {'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
例如。我想在date=2020-08-25文件夹下写两个事件,另一个是date=2020-08-27
在一天结束时,我想把每个事件写在他们的创建日期文件夹下
我该怎么做
谢谢你的帮助
奥古斯
具体地说,要为每个键编写几个元素,可以执行以下操作
请注意,运行程序可能需要在失败时重试元素,因此更安全的方法不是直接写入输出,而是写入临时文件,然后在成功时自动重命名它,例如
在代码中,您使用了多个输出。 这意味着将一个DoFn(ParDo)的输出连接到另一个DoFn,这对于整个管道来说是静态的
如果您希望根据所拥有的内容将数据转储到不同的文件中,则必须实现一个DoFn,以便进行写入
大概是这样的:
您应该更改DataParser DoFn以生成一个元组(日期、值)而不是TaggedOut,并将管道更改为如下内容:
相关问题 更多 >
编程相关推荐