以tuple为值的Spark Streaming updateStateByKey

2024-10-01 04:51:16 发布

您现在位置:Python中文网/ 问答频道 /正文

是否可以将updateStateByKey()函数与元组一起用作值?我使用的是PySpark,我的输入是(word, (count, tweet_id)),这意味着word是一个键,一个元组{}是一个值。updateStateByKey的任务是对每个单词求和计数,并创建包含该单词的所有tweet_id的列表。在

我实现了以下更新函数,但是索引为1的new_values的错误列表索引超出范围:

def updateFunc(new_values, last_sum):
  count = 0
  tweets_id = []
  if last_sum:
    count = last_sum[0]
    tweets_id = last_sum[1]
  return sum(new_values[0]) + count, tweets_id.extend(new_values[1])

调用方法:

^{pr2}$

Tags: 函数id列表newcount单词tweetspyspark
1条回答
网友
1楼 · 发布于 2024-10-01 04:51:16

我找到了解决办法。问题出在checkpointing上,这意味着如果发生故障,当前状态将被保存到磁盘上。它引起了一些问题,因为当我改变了状态的定义时,在checkpoint中它处于没有元组的旧状态。因此,我从磁盘上删除了checkpoint,并将最终解决方案实现为:

def updateFunc(new_values, last_sum):
  count = 0
  counts = [field[0] for field in new_values]
  ids = [field[1] for field in new_values]
  if last_sum:
    count = last_sum[0]
    new_ids = last_sum[1] + ids
  else:
    new_ids = ids
  return sum(counts) + count, new_ids

最后,我的问题的答案是:是的,状态可以是元组或任何其他用于存储更多值的数据类型。在

相关问题 更多 >