java Spark流式卡夫卡消费者
我正在尝试设置一个Spark Streaming简单应用程序,它可以读取卡夫卡主题的消息
经过大量的工作,我在这个阶段,但得到的例外如下所示
代码:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" + ":" + "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " + brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
System.out.println("Message received: " + messages);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
它抛出:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)
出于绝望,我试着联系动物园管理员:
String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";
但这意味着:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)
相关依赖项包括:
<properties>
<spark.version>1.6.2</spark.version>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
我想问:
我应该连接到卡夫卡代理还是zookeeper服务器
我的代码中有什么错误导致无法连接/侦听传入消息
# 1 楼答案
属性:
呼叫:
# 2 楼答案
Spark的工作方式是,它的大多数转换都是惰性的。要执行图形时,需要注册一个Output Transformation。输出转换的形式有
foreachRDD
、print
、collect
或count
(以及更多)不要使用
println
,而是调用DStream.print()
:关于Kafka,
metadata.broker.list
需要提供Kafka代理节点的地址。有一个名为zookeeper.connect
的单独键提供ZooKeepers地址