我有PySpark作业InitiatorSpark.py
,代码如下:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Test") \
.getOrCreate()
lines = (spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic","my_topic")
.load("tcp://{}".format("127.0.0.1:1883")))
我运行如下:
spark-submit --jars lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar InitiatorSpark.py
Spark启动,但在第.load("tcp://{}".format("127.0.0.1:1883")))
行失败,并显示以下消息:
Caused by: java.lang.ClassNotFoundException: org.eclipse.paho.client.mqttv3.MqttClientPersistence
虽然我提供了一个正确的JAR文件,但似乎找不到类MqttClientPersistence
。在lib
内,我有两个文件:
spark-streaming-mqtt_2.11-2.2.1-sources.jar
spark-streaming-mqtt_2.11-2.2.1.jar
我的设置有什么问题
我可以通过向spark submit命令添加3个JAR文件来运行此代码:
相关问题 更多 >
编程相关推荐