我们有一个有用户的应用程序;每个用户每次使用我们的应用程序大约10-40分钟,我想根据已经发生的特定事件(例如“此用户已转换”、“此用户上次会话有问题”、“此用户上一次会话成功”)计算每个会话中发生的事件的分布/发生次数。在
(在这之后,我想每天计算这些更高级别的事件,但这是一个单独的问题)
为此,我一直在研究会话窗口;但是所有的docs似乎都是面向全局会话窗口的,但是我希望为每个用户创建它们(这也是一种自然分区)。在
我很难找到关于如何做到这一点的文档(python首选)。你能给我指一下正确的方向吗?在
或者换言之:如何为每个用户创建每个会话窗口,以输出更结构化(丰富)的事件?
class DebugPrinter(beam.DoFn):
"""Just prints the element with logging"""
def process(self, element, window=beam.DoFn.WindowParam):
_, x = element
logging.info(">>> Received %s %s with window=%s", x['jsonPayload']['value'], x['timestamp'], window)
yield element
def sum_by_event_type(user_session_events):
logging.debug("Received %i events: %s", len(user_session_events), user_session_events)
d = {}
for key, group in groupby(user_session_events, lambda e: e['jsonPayload']['value']):
d[key] = len(list(group))
logging.info("After counting: %s", d)
return d
# ...
by_user = valid \
| 'keyed_on_user_id' >> beam.Map(lambda x: (x['jsonPayload']['userId'], x))
session_gap = 5 * 60 # [s]; 5 minutes
user_sessions = by_user \
| 'user_session_window' >> beam.WindowInto(beam.window.Sessions(session_gap),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW) \
| 'debug_printer' >> beam.ParDo(DebugPrinter()) \
| beam.CombinePerKey(sum_by_event_type)
如您所见,Session()窗口不扩展窗口,但只将非常接近的事件分组在一起。。。做错什么了?在
你可以通过在窗口后添加一个分组键转换来让它工作。您已经为记录分配了键,但实际上没有按键将它们组合在一起,会话窗口(按键工作)不知道这些事件需要合并在一起。在
为了证实这一点,我用一些内存中的虚拟数据做了一个可复制的示例(将Pub/Sub与问题隔离开来,并能够更快地进行测试)。所有五个事件都有相同的键或
user_id
,但它们将依次“到达”彼此相隔1、2、4和8秒。当我使用5秒的session_gap
时,我希望前4个元素合并到同一个会话中。第五个项目将在第四个项目之后8秒,因此它必须降级到下一个环节(间隔超过5秒)。数据是这样创建的:我们使用
^{pr2}$beam.Create(data)
初始化管道,beam.window.TimestampedValue
来分配“假”时间戳。同样,我们只是用这个来模拟流行为。之后,我们通过user_id
字段创建键值对,我们打开window.Sessions
并添加缺少的beam.GroupByKey()
步骤。最后,我们使用一个稍作修改的DebugPrinter
记录结果:。管道现在看起来像这样:其中
DebugPrinter
是:如果不按键分组进行测试,则会得到相同的行为:
但是在添加了它之后,窗口现在可以正常工作了。事件0到3在扩展的12s会话窗口中合并在一起。事件4属于单独的5s课程。在
完整代码here
还有两件事值得一提。第一个问题是,即使在一台使用DirectRunner的机器上本地运行这个程序,记录也可能无序(在我的例子中,事件_3在事件_2之前被处理)。这样做是为了模拟文档中的分布式处理here。在
最后一个是,如果您得到这样的堆栈跟踪:
从2.10.0/2.11.0 SDK降级到2.9.0。例如,请参见answer。在
相关问题 更多 >
编程相关推荐