下面的代码创建了6个输入数据流,这些数据流是从Kafka的6个分区主题中读取的,我发现即使为流指定相同的组ID,数据也会重复6次。如果我只创建3个数据流,我会将数据重复3次,依此类推。。。。在
numStreams = 6
kafkaStreams = [KafkaUtils.createDirectStream(ssc, ["send6partitions"], {
"metadata.broker.list": brokers,
"fetch.message.max.bytes": "20971520",
"spark.streaming.blockInterval" : "2000ms",
"group.id" : "the-same"},
valueDecoder = decodeValue, keyDecoder = decode_key) for _ in range (numStreams)]
kvs = ssc.union(*kafkaStreams)
我做错什么了?在
在直接方法中,你不应该从一个主题创建多个数据流。在
从documentation:
所以只需创建一个DStream,Spark将使用所有Kafka分区:)
我不熟悉Python,但是Spark Scala中的直接流不提交任何偏移量。因此,如果您打开一个流n次而没有提交任何read消息的偏移量,那么您的使用者将从开始处开始。在
如果在python中是相同的,则不需要启动n个流。启动一个流,Spark将处理分区分配给执行器/任务本身。在
基本上,Kafka主题是通过共享装货人默认情况下,当您创建数据流时,一个接收器将运行并通过接收器线程(Java线程)将数据从每个Kafka主题分区并行地接收到数据流分区。如果为一个主题创建6个数据流,则意味着同一个主题有6个接收器,但并不意味着每个部分都有一个数据流。每一个接受者得到每一个饲料一次,所以你得到每饲料6倍。在
相关问题 更多 >
编程相关推荐