有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spark流式卡夫卡消费者

我正在尝试设置一个Spark Streaming简单应用程序,它可以读取卡夫卡主题的消息

经过大量的工作,我在这个阶段,但得到的例外如下所示

代码:

public static void main(String[] args) throws Exception {

    String brokers = "my.kafka.broker" + ":" + "6667";
    String topics = "MyKafkaTopic";

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
            .setMaster("local[1]")
            ;
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", brokers);
    System.out.println("Brokers: " + brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );

    System.out.println("Message received: " + messages);

    // Start the computation
    jssc.start();
    jssc.awaitTermination();

}

它抛出:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)

出于绝望,我试着联系动物园管理员:

String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";

但这意味着:

[WARNING] 
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)

相关依赖项包括:

<properties>
  <spark.version>1.6.2</spark.version>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>${kafka.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

我想问:

我应该连接到卡夫卡代理还是zookeeper服务器

我的代码中有什么错误导致无法连接/侦听传入消息


共 (2) 个答案

  1. # 1 楼答案

    import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.google.common.collect.ImmutableMap;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.Properties;
    
    import kafka.serializer.StringDecoder;
    import org.apache.hadoop.security.UserGroupInformation;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import scala.Tuple2;
    
    public class KafkaKerberosReader {
    
        // Spark information
        private static SparkConf conf;
        private static String appName = "KafkaKerberosReader";
        private static JavaStreamingContext context;
        private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());
    
        // Kafka information
        private static String zkQuorum = "";
        private static String kfkQuorum = "";
        private static String group = "";
        private static Integer threads = 1;
        private static Map<String, String> kafkaParams = new HashMap<String, String>();
    
        public static void loadProps() {
            Properties prop = new Properties();
            try {
                logger.info("               loadProps");
                InputStream input = new FileInputStream("config.properties");
                prop.load(input);
                System.out.println("loadProps loaded:" + prop);
    
                appName = prop.getProperty("app.name");
                autoOffsetReset = prop.getProperty("auto.offset.reset");
                secProtocol = prop.getProperty("security.protocol");
                kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
                zkQuorum = zServers = prop.getProperty("zookeeper.connect");
                group = kGroupId = prop.getProperty("group.id");
                kKeyTabFile = prop.getProperty("kerberos.keytabfile");
                kJaas = prop.getProperty("kerberos.jaas");
                kTopic = prop.getProperty("kafka.topic");
                kPrincipal = prop.getProperty("kerberos.principal");
                logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:"
                        + autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile
                        + ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal);
    
                if (kPrincipal != null && kKeyTabFile != null) {
                    logger.info("          -Logging into Kerberos");
                    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
                    conf.set("hadoop.security.authentication", "Kerberos");
                    UserGroupInformation.setConfiguration(conf);
                    UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            logger.info("               main:START");
            loadProps();
            // Configure the application
            configureSpark();
    
            // Create the context
            context = createContext(kTopic);
    
            // Stop the application
            context.start();
            context.awaitTermination();
            logger.info("main:END");
        }
    
        /**
         *                        - | This is the kernel of
         * the spark application |                        -
         *
         */
        private static JavaStreamingContext createContext(String topic) {
    
            logger.info("                           -");
            logger.info("|            Starting: {}             |", appName);
            logger.info("|            kafkaParams:              |", kafkaParams);
            logger.info("                           -");
    
            // Create the spark streaming context
            context = new JavaStreamingContext(conf, Seconds.apply(5));
    
            // Read from a Kerberized Kafka
            JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
                    ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());
    
            kafkaStream.print();
            JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public String call(Tuple2<String, String> tuple2) {
                    return tuple2._2();
                }
            });
            lines.print();
    
            // kafkaStream.map(message -> message._2.toLowerCase()).print();
            logger.info("                           -");
            logger.info("|            Finished: {}             |", appName);
            logger.info("                           -");
    
            return context;
        }
    
        /**
         * Create a SparkConf and configure it.
         *
         */
        private static void configureSpark() {
            logger.info("               Initializing '%s'.", appName);
            conf = new SparkConf().setAppName(appName);
    
            if (group != null && group.trim().length() != 0) {
                kafkaParams.put("group.id", group);
            }
            kafkaParams.put("auto.offset.reset", autoOffsetReset);
            kafkaParams.put("security.protocol", secProtocol);
            kafkaParams.put("bootstrap.servers", kfkQuorum);
            kafkaParams.put("zookeeper.connect", zkQuorum);
    
            logger.info(">- Configuration done with the follow properties:");
            logger.info(conf.toDebugString());
        }
    
        static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;
    
    }
    

    属性:

    app.name=KafkaKerberosReader
    
    auto.offset.reset=smallest
    
    security.protocol=PLAINTEXTSASL
    
    bootstrap.servers=sandbox.hortonworks.com:6667
    
    zookeeper.connect=sandbox.hortonworks.com:2181
    
    group.id=Default
    
    kafka.topic=ifinboundprecint
    
    //#kerberos.keytabfile=/etc/hello.keytab
    
    //#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf
    
    //#kerberos.principal=hello@EXAMPLE.COM
    

    呼叫:

    spark-submit master yarn deploy-mode client num-executors 3 executor-memory 500M executor-cores 3 class com.my.spark.KafkaKerberosReader ~/SparkStreamKafkaTest-1.0-SNAPSHOT.jar

  2. # 2 楼答案

    Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

    Spark的工作方式是,它的大多数转换都是惰性的。要执行图形时,需要注册一个Output Transformation。输出转换的形式有foreachRDDprintcollectcount(以及更多)

    不要使用println,而是调用DStream.print()

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
    );
    
    messages.print();
    
    // Start the computation
    jssc.start();
    jssc.awaitTermination();
    

    关于Kafka,metadata.broker.list需要提供Kafka代理节点的地址。有一个名为zookeeper.connect的单独键提供ZooKeepers地址