当使用hbase作为数据源时,spark是否使用hbase键的排序顺序

2024-09-27 07:34:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我将time-series数据存储在HBase中。rowkey由user_idtimestamp组成,如下所示:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

    "userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

现在我需要执行每用户分析。这是hbase_rdd(来自here)的初始化

^{pr2}$

类似mapreduce的自然处理方式是:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user's data

在执行第一个映射(将时间戳从键移到值)时,必须知道当前用户的时间序列数据何时完成,因此可以开始groupByKey转换。因此,我们不需要映射所有表并存储所有临时数据。这是可能的,因为hbase按排序顺序存储行键。在

使用hadoop流媒体,可以这样做:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

问题是:如何利用Spark中hbase键的排序顺序?在


Tags: 数据用户datavalcurrenttimestamprowsplit
1条回答
网友
1楼 · 发布于 2024-09-27 07:34:47

我不太熟悉从HBase获取数据的保证,但如果我理解正确,我可以用普通的Spark来回答。在

你有一些RDD[X]。据Spark所知,其中的X是完全无序的。但是您有一些外部知识,并且可以保证数据实际上是按X的某个字段分组的(甚至可能按另一个字段排序)。在

在这种情况下,您可以使用mapPartitions来做与hadoop流媒体几乎相同的事情。这使您可以迭代一个分区中的所有记录,因此可以查找具有相同键的记录块。在

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  //itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
      // same user as before   add the data to our list
      currentUserData += x
      None
    } else {
      // its a new user   return all the data for the old user, and make
      // another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
// now groupedRDD has all the data for one user grouped together, and we didn't
// need to do an expensive shuffle.  Also, the above transformation is lazy, so
// we don't necessarily even store all that data in memory   we could still
// do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

我知道你想用python抱歉,我想如果我用Scala写的话,这个例子更可能是正确的。我认为类型注释使含义更清楚,但这可能是Scala的偏见。。。:). 不管怎样,希望你能理解发生了什么并翻译它。(不要太担心flatMap&;Some&;None,如果你理解了这个想法,可能就不重要了…)

相关问题 更多 >

    热门问题