通过python实现Spark-kafka集成的问题

2024-07-08 12:08:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我在通过python集成spark-Kafka时遇到了这个问题。我已经更改了所有的spark库,但仍然面临相同的问题。在

以下是错误消息:

Traceback (most recent call last): File "D:/Work/kafka_wordcount.py", line 18, in kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) File "D:\softwares\ApacheSpark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 70, in createStream File "D:\softwares\ApacheSpark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py", line 1133, in call File "D:\softwares\ApacheSpark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.createStream. : java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.streaming.kafka.Kenter code hereafkaUtils$.createStream(KafkaUtils.scala:81) at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:151) at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createStream(KafkaUtils.scala:555) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 26 more

请提出任何解决方案。源代码如下:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic],{"bootstrap.servers": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

我使用python集成spark kafka,其中包含以下详细信息:

^{pr2}$

我已经更改了spark库和kafka库,但是没有得到输出。在


Tags: kafkalangnetlinejavaatsparkinvoke

热门问题