我需要从批处理中积累一段时间的数据,以便进行后期处理。我使用的是Spark 1.6.3。
我需要以(tag, [[time, value],..]
的形式累积它们。
到目前为止,我已经尝试了updateStateByKey
:
time = [0]
def updateFunc(new_values, last_sum,time):
time[0] += 5
if time == 10:
time = 0
return None
return (last_sum or []) + new_values
data = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, ['t','t1'])) \
.updateStateByKey(lambda x,y :updateFunc(x,y,time))
data.pprint()
正在添加数据。但是,尝试在10秒后刷新数据是行不通的。(我做错了)
我也尝试过使用window
:
data= lines.flatMap(lambda lime: line.split(' ')\
.map(lambda tag: (tag: ['time', 'value']))\
.window(10, 2)\
.reduceByKey(lambda x,y : y + x)`
但是,这会产生一个一维的长列表。这是没有用的。
有线索吗?非常感谢。你知道吗
试试这个
相关问题 更多 >
编程相关推荐