是否可以将updateStateByKey()
函数与元组一起用作值?我使用的是PySpark,我的输入是(word, (count, tweet_id))
,这意味着word
是一个键,一个元组{updateStateByKey
的任务是对每个单词求和计数,并创建包含该单词的所有tweet_id的列表。在
我实现了以下更新函数,但是索引为1的new_values
的错误列表索引超出范围:
def updateFunc(new_values, last_sum):
count = 0
tweets_id = []
if last_sum:
count = last_sum[0]
tweets_id = last_sum[1]
return sum(new_values[0]) + count, tweets_id.extend(new_values[1])
调用方法:
^{pr2}$
我找到了解决办法。问题出在checkpointing上,这意味着如果发生故障,当前状态将被保存到磁盘上。它引起了一些问题,因为当我改变了状态的定义时,在checkpoint中它处于没有元组的旧状态。因此,我从磁盘上删除了checkpoint,并将最终解决方案实现为:
最后,我的问题的答案是:是的,状态可以是元组或任何其他用于存储更多值的数据类型。在
相关问题 更多 >
编程相关推荐