scala Spark kafka流媒体java。lang.NoClassDefFoundError:akka/util/Helpers$ConfigOps$
我正在编写一个用Scala编写的spark应用程序,它可以收听卡夫卡主题。应用程序只是打印出收到的消息,仅此而已。我正在我的机器上本地运行这个
......
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
.....
val topics = "topicName";//args(2)
val numThreads = 1;
val zkQuorum = "zookeeper quorum"
val group = "groupName"
//val inputTable = args(5)
// Set application name (when we run this on the cluster this is what will appear in the spark UI)
val sparkConf = new SparkConf().setAppName("appName").setMaster("local")
// On what topics shall I listen? Create an array of topics to pass to the stream configurations...
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Will poll every X seconds
val ssc = new StreamingContext(sparkConf, Seconds(15))
// Create stream (has not been triggered yet)
val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// For each stream!
stream.foreachRDD { rdd =>
// Check if empty...
if (rdd.toLocalIterator.nonEmpty)
{
// Create SQL Context out of the streaming
val sqlContext = new SQLContext(ssc.sparkContext)
// Register temporary table out of the recieved
sqlContext.read.json(rdd).registerTempTable("resultRecieved")
// Create SQL Context, and you can filter
val result = sqlContext.sql(
"select * FROM resultRecieved")
if(result.count() == 0 )
println("No results!")
else{
println(result.count() + " Results!")
}
}
}
// Print stream
stream.print
// Trigger!
ssc.start()
// Await stopping of the service...
ssc.awaitTermination()
我使用的是Scala 2.10.4,下面是我的maven配置
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>0.9.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-swing</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
和斯卡拉在一起。版本为2.10.1和spark。版本为1.5.1
我越来越
Exception in thread "main" java.lang.NoClassDefFoundError: akka/util/Helpers$ConfigOps$
at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:49)
at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala:114)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:161)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:550)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:52)
at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:55)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:253)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:53)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:252)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:450)
at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:549)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:75)
at SIVConsumer$.main(SIVConsumer.scala:33)
at SIVConsumer.main(SIVConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.ClassNotFoundException: akka.util.Helpers$ConfigOps$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 38 more
Process finished with exit code 1
知道哪些依赖关系不能很好地相处吗?我尝试过使用最新的Scala二进制文件,但仍然失败
# 1 楼答案
我想问题在于spark流媒体版本。其应与火花芯(1.5.2)相同。同样适用于spark-streaming-kafka_2.10
你可以参考一个正在工作的pom。来自示例项目https://github.com/atulsm/Test_Projects/blob/master/pom.xml的xml