引用:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies
这个例子有python版本吗?引用只有java等价物。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html发现了一些相似之处。你知道吗
我可以匹配bootstrap.servers服务器引导服务器,密钥序列化程序到密钥序列化程序,值序列化程序要计算序列化程序的值,但我无法匹配最后3个组id", "自动偏移复位“还有 "启用.auto.commit". 你知道吗
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
不过,那不是火花代码。Spark需要javakafkaapi的属性。你知道吗
^{} is not available in Python ,但是如果您想使用Spark,这是0.8api,which has Python examples
您看到的Scala代码是针对消费者的。所以你需要检查消费者的设置,而不是生产者的
如果你看https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html,你可以找到它们的等价物:
group.id
:group_id
auto.offset.reset
:auto_offset_reset
enable.auto.commit
:enable_auto_commit
还要注意,使用者具有反序列化程序而不是序列化程序,因此:
key.deserializer
:key_deserializer
value.deserializer
:value_deserializer
相关问题 更多 >
编程相关推荐