Kafka消费者:AttributeError:“list”对象没有属性“map”

2024-10-03 19:31:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用Python从Kafka队列读取消息。例如,在Scala中很容易做到:

    val ssc = new StreamingContext(conf, Seconds(20))

    // Divide the topic into partitions
    val topicMessages = "myKafkaTopic"
    val topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap

    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2)

    messages.foreachRDD { rdd =>
      //...
    }

我想在Python中做同样的事情。这是我当前的Python代码:

^{pr2}$

但是我在topicMessagesMap = topicMessages.split(",").map((_, kafkaNumThreads)).toMap行得到错误:

AttributeError: 'list' object has no attribute 'map'

如何使此代码工作?在

更新:

如果我在Jupyter笔记本中运行此代码,则会出现如下错误:

messages = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {inputKafkaTopic: list})

Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the spark-submit command as

    $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.0.0 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/, Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.0.0. Then, include the jar in the spark-submit command as

    $ bin/spark-submit --jars ...

我是否正确地理解,使其工作的唯一方法是使用spark-submit,并且不可能从Jupyter/IPython运行此代码?在


Tags: kafkathe代码inorgmapvalspark