SparkStreaming:如何像collect()一样获取列表

2024-09-29 21:40:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我是SparkStreaming的初学者。
我想在SparkStreaming应用程序中加载HBase记录。
所以,我用python编写下面的代码。
我的“load_records”函数正在获取HBase记录并返回记录。
SparkStreaming不能使用collect(). sc.newAPIHadoopRDD()需要在驱动程序中使用。但是SparkStreaming没有从工人到驱动程序获取对象的方法。
如何在SparkStreaming获得HBase记录?或者如何调用sc.newAPIHadoopRDD()?在

def load_records(sc, table, keys):
    host = 'localhost'
    keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
    rdd_list = []
    for key in keys:
        if table == "user":
            conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "user",
                "hbase.mapreduce.scan.columns": "u:uid",
                "hbase.mapreduce.scan.row.start": key, "hbase.mapreduce.scan.row.stop": key + "\x00"}

        rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
                             "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
                             "org.apache.hadoop.hbase.client.Result",
                             keyConverter=keyConv, valueConverter=valueConv, conf=conf)
        rdd_list.append(rdd)
    first_rdd = rdd_list.pop(0)
    for rdd in rdd_list:
        first_rdd = first_rdd.union(rdd)
    return first_rdd

sc = SparkContext(appName="UserStreaming")
ssc = StreamingContext(sc, 3)
topics = ["json"]
broker_list = "localhost:9092"
inputs = KafkaUtils.createDirectStream(ssc, topics, {"metadata.broker.list": broker_list})
jsons = inputs.map(lambda input: json.loads(input[1]))
user_id_rdd = jsons.map(lambda json: json["user_id"])
# the under line is not working. Any another methods?
user_id_list = user_id_rdd.collect()
user_record_rdd = load_records(sc, 'user', user_id_list)

Tags: orgidjsonapache记录loadlistfirst

热门问题