Apache Beam peruser会话窗口未合并

2024-09-28 01:23:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我们有一个有用户的应用程序;每个用户每次使用我们的应用程序大约10-40分钟,我想根据已经发生的特定事件(例如“此用户已转换”、“此用户上次会话有问题”、“此用户上一次会话成功”)计算每个会话中发生的事件的分布/发生次数。在

(在这之后,我想每天计算这些更高级别的事件,但这是一个单独的问题)

为此,我一直在研究会话窗口;但是所有的docs似乎都是面向全局会话窗口的,但是我希望为每个用户创建它们(这也是一种自然分区)。在

我很难找到关于如何做到这一点的文档(python首选)。你能给我指一下正确的方向吗?在

或者换言之:如何为每个用户创建每个会话窗口,以输出更结构化(丰富)的事件?

我有什么

class DebugPrinter(beam.DoFn):
  """Just prints the element with logging"""
  def process(self, element, window=beam.DoFn.WindowParam):
    _, x = element
    logging.info(">>> Received %s %s with window=%s", x['jsonPayload']['value'], x['timestamp'], window)
    yield element

def sum_by_event_type(user_session_events):
  logging.debug("Received %i events: %s", len(user_session_events), user_session_events)
  d = {}
  for key, group in groupby(user_session_events, lambda e: e['jsonPayload']['value']):
    d[key] = len(list(group))
  logging.info("After counting: %s", d)
  return d

# ...

by_user = valid \
  | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['jsonPayload']['userId'], x))

session_gap = 5 * 60 # [s]; 5 minutes

user_sessions = by_user \
  | 'user_session_window'   >> beam.WindowInto(beam.window.Sessions(session_gap),
                                               timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW) \
  | 'debug_printer'         >> beam.ParDo(DebugPrinter()) \
  | beam.CombinePerKey(sum_by_event_type)

它输出的内容

^{pr2}$

如您所见,Session()窗口不扩展窗口,但只将非常接近的事件分组在一起。。。做错什么了?在


Tags: 用户应用程序bysessionloggingwith事件element
1条回答
网友
1楼 · 发布于 2024-09-28 01:23:34

你可以通过在窗口后添加一个分组键转换来让它工作。您已经为记录分配了键,但实际上没有按键将它们组合在一起,会话窗口(按键工作)不知道这些事件需要合并在一起。在

为了证实这一点,我用一些内存中的虚拟数据做了一个可复制的示例(将Pub/Sub与问题隔离开来,并能够更快地进行测试)。所有五个事件都有相同的键或user_id,但它们将依次“到达”彼此相隔1、2、4和8秒。当我使用5秒的session_gap时,我希望前4个元素合并到同一个会话中。第五个项目将在第四个项目之后8秒,因此它必须降级到下一个环节(间隔超过5秒)。数据是这样创建的:

data = [{'user_id': 'Thanos', 'value': 'event_{}'.format(event), 'timestamp': time.time() + 2**event} for event in range(5)]

我们使用beam.Create(data)初始化管道,beam.window.TimestampedValue来分配“假”时间戳。同样,我们只是用这个来模拟流行为。之后,我们通过user_id字段创建键值对,我们打开window.Sessions并添加缺少的beam.GroupByKey()步骤。最后,我们使用一个稍作修改的DebugPrinter记录结果:。管道现在看起来像这样:

^{pr2}$

其中DebugPrinter是:

class DebugPrinter(beam.DoFn):
  """Just prints the element with logging"""
  def process(self, element, window=beam.DoFn.WindowParam):
    for x in element[1]:
      logging.info(">>> Received %s %s with window=%s", x['value'], x['timestamp'], window)

    yield element

如果不按键分组进行测试,则会得到相同的行为:

INFO:root:>>> Received event_0 1554117323.0 with window=[1554117323.0, 1554117328.0)
INFO:root:>>> Received event_1 1554117324.0 with window=[1554117324.0, 1554117329.0)
INFO:root:>>> Received event_2 1554117326.0 with window=[1554117326.0, 1554117331.0)
INFO:root:>>> Received event_3 1554117330.0 with window=[1554117330.0, 1554117335.0)
INFO:root:>>> Received event_4 1554117338.0 with window=[1554117338.0, 1554117343.0)

但是在添加了它之后,窗口现在可以正常工作了。事件0到3在扩展的12s会话窗口中合并在一起。事件4属于单独的5s课程。在

INFO:root:>>> Received event_0 1554118377.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_1 1554118378.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_3 1554118384.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_2 1554118380.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_4 1554118392.37 with window=[1554118392.37, 1554118397.37)

完整代码here

还有两件事值得一提。第一个问题是,即使在一台使用DirectRunner的机器上本地运行这个程序,记录也可能无序(在我的例子中,事件_3在事件_2之前被处理)。这样做是为了模拟文档中的分布式处理here。在

最后一个是,如果您得到这样的堆栈跟踪:

TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Write Results/Write/WriteImpl/WriteBundles']

从2.10.0/2.11.0 SDK降级到2.9.0。例如,请参见answer。在

相关问题 更多 >

    热门问题