有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

卡桑德拉DSE4。6 spark sql java程序运行时错误

我正在尝试使用dse 4.6,并用java编写一个基本的spark sql程序 将对在cassandra上以编程方式创建的我的表运行spark sql select查询

虽然没有编译错误,但是在运行时,我得到了很多未发现的类异常。无法理解问题在哪里

为了安全起见,我也手动下载了所需的JAR,但仍然显示运行时错误。请帮忙

这是我的节目

    import com.datastax.bdp.spark.DseSparkConfHelper;
    import com.datastax.driver.core.Session;
    import com.datastax.spark.connector.cql.CassandraConnector;
    import com.datastax.spark.connector.japi.CassandraJavaUtil;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;

    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.List;

//import com.sun.rowset.internal.Row;
//import org.apache.spark.sql.api.java;

public class Example {

    @SuppressWarnings("serial")
    public static class Employee implements Serializable {
        private String name;
        private int id;
        private String role;

        public Employee(String name, int id, String role) {
            this.name = name;
            this.id = id;
            this.role = role;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return id;
        }

        public void setAge(int age) {
            this.id = age;
        }

        public String getRole() {
            return role;
        }

        public void setRole(String role) {
            this.role = role;
        }
    }


    private static void generateData(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
       Session session = connector.openSession();
        {
            session.execute("DROP KEYSPACE IF EXISTS java_api");
            session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
            session.execute("CREATE TABLE java_api.employees (id INT PRIMARY KEY, name TEXT, role TEXT)");
        }

        List<Employee> employees = Arrays.asList(new Employee("John Adam",31222, "Manager"),
                new Employee("Henry Floyd", 31223,"Architect"),
                new Employee("Diana Wen",31224, "Developer"),
                new Employee("Tina Turner",31225, "Developer"),
                new Employee("Smith Jones",31226, "Manager"),
                new Employee("Jhonty Rhodes", 31227,"Project Manager"));

        JavaRDD<Employee> employeesJavaRDD = sc.parallelize(employees);
        CassandraJavaUtil.javaFunctions(employeesJavaRDD).writerBuilder("java_api", "employees", CassandraJavaUtil.mapToRow(Employee.class)).saveToCassandra();
    }
    public static void main(String[] args) {
        String master = "";
        String host = "";
        master = "local[4]";
        host = "localhost";


        // create a new configuration
         SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()).setAppName( "My application");

        // create a Spark context
        JavaSparkContext sc = new JavaSparkContext(conf);

        // create a Cassandra Spark SQL context
        //JavaCassandraSQLContext javaCassandraContext = new JavaCassandraSQLContext(sc);
        SQLContext sqlContext = new SQLContext(sc);
        generateData(sc);

        DataFrame  employees =sqlContext.sql("SELECT * FROM employees");
        employees.registerTempTable("employees");
        DataFrame managers = sqlContext.sql("SELECT name FROM employees WHERE role == 'Manager' ");

        /*List<String> managerNames = managers.map(new Function<Row, String>() {
            @Override
            public String call(Row row) {
                return "Name: " + row.getString(0);
            }
        }).collect();

        for (String name: managerNames) {
            System.out.println(name);
        }*/
        sc.stop();
    }
}

错误如下所示:

exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at com.datastax.bdp.spark.DseSparkConfHelper$.enrichSparkConf(DseSparkConfHelper.scala:25)
    at com.datastax.bdp.spark.DseSparkConfHelper.enrichSparkConf(DseSparkConfHelper.scala)
    at Example.main(Example.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 20 more

Process finished with exit code 1

请帮忙


共 (0) 个答案