我想用Python从Kafka队列读取消息。例如,在Scala中很容易做到:
val ssc = new StreamingContext(conf, Seconds(20))
// Divide the topic into partitions
val topicMessages = "myKafkaTopic"
val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)
messages.foreachRDD { rdd =>
//...
}
我想在Python中做同样的事情。这是我当前的Python代码:
^{pr2}$但是我在topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap
行得到错误:
AttributeError: 'list' object has no attribute 'map'
如何使此代码工作?在
更新:
如果我在Jupyter笔记本中运行此代码,则会出现如下错误:
messages = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {inputKafkaTopic: list})
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
Include the Kafka library and its dependencies with in the spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.0.0 ...
Download the JAR of the artifact from Maven Central http://search.maven.org/, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.0.0. Then, include the jar in the spark-submit command as
$ bin/spark-submit --jars ...
我是否正确地理解,使其工作的唯一方法是使用spark-submit
,并且不可能从Jupyter/IPython运行此代码?在
目前没有回答
相关问题 更多 >
编程相关推荐