我将time-series
数据存储在HBase
中。rowkey由user_id
和timestamp
组成,如下所示:
{
"userid1-1428364800" : {
"columnFamily1" : {
"val" : "1"
}
}
}
"userid1-1428364803" : {
"columnFamily1" : {
"val" : "2"
}
}
}
"userid2-1428364812" : {
"columnFamily1" : {
"val" : "abc"
}
}
}
}
现在我需要执行每用户分析。这是hbase_rdd
(来自here)的初始化
类似mapreduce的自然处理方式是:
hbase_rdd
.map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1]))) # shift timestamp from key to value
.groupByKey()
.map(processUserData) # process user's data
在执行第一个映射(将时间戳从键移到值)时,必须知道当前用户的时间序列数据何时完成,因此可以开始groupByKey转换。因此,我们不需要映射所有表并存储所有临时数据。这是可能的,因为hbase按排序顺序存储行键。在
使用hadoop流媒体,可以这样做:
import sys
current_user_data = []
last_userid = None
for line in sys.stdin:
k, v = line.split('\t')
userid, timestamp = k.split('-')
if userid != last_userid and current_user_data:
print processUserData(last_userid, current_user_data)
last_userid = userid
current_user_data = [(timestamp, v)]
else:
current_user_data.append((timestamp, v))
问题是:如何利用Spark中hbase键的排序顺序?在
我不太熟悉从HBase获取数据的保证,但如果我理解正确,我可以用普通的Spark来回答。在
你有一些
RDD[X]
。据Spark所知,其中的X
是完全无序的。但是您有一些外部知识,并且可以保证数据实际上是按X
的某个字段分组的(甚至可能按另一个字段排序)。在在这种情况下,您可以使用
mapPartitions
来做与hadoop流媒体几乎相同的事情。这使您可以迭代一个分区中的所有记录,因此可以查找具有相同键的记录块。在我知道你想用python抱歉,我想如果我用Scala写的话,这个例子更可能是正确的。我认为类型注释使含义更清楚,但这可能是Scala的偏见。。。:). 不管怎样,希望你能理解发生了什么并翻译它。(不要太担心
flatMap
&;Some
&;None
,如果你理解了这个想法,可能就不重要了…)相关问题 更多 >
编程相关推荐