有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

scala Spark kafka流媒体java。lang.NoClassDefFoundError:akka/util/Helpers$ConfigOps$

我正在编写一个用Scala编写的spark应用程序,它可以收听卡夫卡主题。应用程序只是打印出收到的消息,仅此而已。我正在我的机器上本地运行这个

......
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
.....
    val topics = "topicName";//args(2)
    val numThreads = 1;
    val zkQuorum = "zookeeper quorum"
    val group = "groupName"
    //val inputTable = args(5)

// Set application name (when we run this on the cluster this is what will appear in the spark UI)
val sparkConf = new SparkConf().setAppName("appName").setMaster("local")

// On what topics shall I listen? Create an array of topics to pass to the stream configurations...
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

// Will poll every X seconds
val ssc = new StreamingContext(sparkConf, Seconds(15))

// Create stream (has not been triggered yet)
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

// For each stream!
stream.foreachRDD { rdd =>

  // Check if empty...
  if (rdd.toLocalIterator.nonEmpty)
  {
    // Create SQL Context out of the streaming
    val sqlContext = new SQLContext(ssc.sparkContext)

    // Register temporary table out of the recieved
    sqlContext.read.json(rdd).registerTempTable("resultRecieved")

    // Create SQL Context, and you can filter
    val result = sqlContext.sql(
      "select * FROM resultRecieved")

    if(result.count() == 0 )
      println("No results!")
    else{
      println(result.count() + " Results!")

    }
  }
}

// Print stream
stream.print

// Trigger!
ssc.start()

// Await stopping of the service...
ssc.awaitTermination()

我使用的是Scala 2.10.4,下面是我的maven配置

    <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.0.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.5.1</version>
    </dependency>
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>0.9.0-incubating</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-swing</artifactId>
        <version>2.10.4</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.1</version>
    </dependency>
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.10</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>

和斯卡拉在一起。版本为2.10.1和spark。版本为1.5.1

我越来越

Exception in thread "main" java.lang.NoClassDefFoundError: akka/util/Helpers$ConfigOps$
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:49)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:161)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:550)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:253)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:252)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:450)
at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75)
at SIVConsumer$.main(SIVConsumer.scala:33)
at SIVConsumer.main(SIVConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: akka.util.Helpers$ConfigOps$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 38 more

Process finished with exit code 1

知道哪些依赖关系不能很好地相处吗?我尝试过使用最新的Scala二进制文件,但仍然失败


共 (1) 个答案