Kafka:Scala到Python的转换

2024-10-02 06:33:42 发布

您现在位置:Python中文网/ 问答频道 /正文

引用:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

这个例子有python版本吗?引用只有java等价物。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html发现了一些相似之处。你知道吗

我可以匹配bootstrap.servers服务器引导服务器,密钥序列化程序到密钥序列化程序,值序列化程序要计算序列化程序的值,但我无法匹配最后3个组id", "自动偏移复位“还有 "启用.auto.commit". 你知道吗

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

Tags: kafkaorgimport程序idautostreamstring
2条回答

I found some similarities at https://kafka-python.readthedocs.io

不过,那不是火花代码。Spark需要javakafkaapi的属性。你知道吗

^{} is not available in Python,但是如果您想使用Spark,这是0.8api,which has Python examples

from pyspark.streaming.kafka import KafkaUtils

# ssc = <get a StreamingContext>
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

您看到的Scala代码是针对消费者的。所以你需要检查消费者的设置,而不是生产者的

如果你看https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html,你可以找到它们的等价物:

  • group.idgroup_id
  • auto.offset.resetauto_offset_reset
  • enable.auto.commitenable_auto_commit

还要注意,使用者具有反序列化程序而不是序列化程序,因此:

  • key.deserializerkey_deserializer
  • value.deserializervalue_deserializer

相关问题 更多 >

    热门问题