卡桑德拉DSE4。6 spark sql java程序运行时错误
我正在尝试使用dse 4.6,并用java编写一个基本的spark sql程序 将对在cassandra上以编程方式创建的我的表运行spark sql select查询
虽然没有编译错误,但是在运行时,我得到了很多未发现的类异常。无法理解问题在哪里
为了安全起见,我也手动下载了所需的JAR,但仍然显示运行时错误。请帮忙
这是我的节目
import com.datastax.bdp.spark.DseSparkConfHelper;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
//import com.sun.rowset.internal.Row;
//import org.apache.spark.sql.api.java;
public class Example {
@SuppressWarnings("serial")
public static class Employee implements Serializable {
private String name;
private int id;
private String role;
public Employee(String name, int id, String role) {
this.name = name;
this.id = id;
this.role = role;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return id;
}
public void setAge(int age) {
this.id = age;
}
public String getRole() {
return role;
}
public void setRole(String role) {
this.role = role;
}
}
private static void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
Session session = connector.openSession();
{
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.employees (id INT PRIMARY KEY, name TEXT, role TEXT)");
}
List<Employee> employees = Arrays.asList(new Employee("John Adam",31222, "Manager"),
new Employee("Henry Floyd", 31223,"Architect"),
new Employee("Diana Wen",31224, "Developer"),
new Employee("Tina Turner",31225, "Developer"),
new Employee("Smith Jones",31226, "Manager"),
new Employee("Jhonty Rhodes", 31227,"Project Manager"));
JavaRDD<Employee> employeesJavaRDD = sc.parallelize(employees);
CassandraJavaUtil.javaFunctions(employeesJavaRDD).writerBuilder("java_api", "employees", CassandraJavaUtil.mapToRow(Employee.class)).saveToCassandra();
}
public static void main(String[] args) {
String master = "";
String host = "";
master = "local[4]";
host = "localhost";
// create a new configuration
SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()).setAppName( "My application");
// create a Spark context
JavaSparkContext sc = new JavaSparkContext(conf);
// create a Cassandra Spark SQL context
//JavaCassandraSQLContext javaCassandraContext = new JavaCassandraSQLContext(sc);
SQLContext sqlContext = new SQLContext(sc);
generateData(sc);
DataFrame employees =sqlContext.sql("SELECT * FROM employees");
employees.registerTempTable("employees");
DataFrame managers = sqlContext.sql("SELECT name FROM employees WHERE role == 'Manager' ");
/*List<String> managerNames = managers.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
for (String name: managerNames) {
System.out.println(name);
}*/
sc.stop();
}
}
错误如下所示:
exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at com.datastax.bdp.spark.DseSparkConfHelper$.enrichSparkConf(DseSparkConfHelper.scala:25)
at com.datastax.bdp.spark.DseSparkConfHelper.enrichSparkConf(DseSparkConfHelper.scala)
at Example.main(Example.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 20 more
Process finished with exit code 1
请帮忙
共 (0) 个答案