有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java中使用嵌入式Cassandra服务器测试CassandraSpark作业的示例

我是卡桑德拉和斯帕克的新手。我正在尝试为我的Spark工作设置一个测试,它执行以下操作:

  1. 将表A中的数据加载到数据帧中
  2. 对这些数据帧进行过滤、分组和聚合
  3. 将结果加载到表B中

我想使用嵌入式Cassandra服务器来运行测试,而不是让它连接到Cassandra数据库的本地实例。以前有人这样做过吗?如果是这样,有人能给我举个好例子吗?提前谢谢你的帮助


共 (1) 个答案

  1. # 1 楼答案

    this code does
    
    package cassspark.clt;
    
    import java.io.*;
    import javafx.application.Application;
    import java.util.concurrent.Executors ;
    import java.util.concurrent.ExecutorService;
    import org.apache.cassandra.service.CassandraDaemon;
    import com.datastax.driver.core.exceptions.ConnectionException;
    import java.util.Properties;
    import org.apache.log4j.PropertyConfigurator;
    import org.apache.spark.sql.SparkSession;
    
    public class EmbeddedCassandraDemo extends Application {
    
        private ExecutorService executor = Executors.newSingleThreadExecutor();
        private CassandraDaemon cassandraDaemon;
    
        public EmbeddedCassandraDemo() {
        }
    
        public static void main(String[] args) {
            try {
                new EmbeddedCassandraDemo().run();
            }
            catch(java.lang.InterruptedException e)
            {
                ;
            }
        }
    
        @Override public void start(javafx.stage.Stage stage) throws Exception
        {
            stage.show();
        }
    
        private void run() throws InterruptedException, ConnectionException {
            setProperties();
            activateDeamon();
        }
    
        private void activateDeamon() {
            executor.execute( new Runnable() {
    
                @Override
                public void run() {
                    cassandraDaemon = new CassandraDaemon();
                    cassandraDaemon.activate();
                    SparkSession spark = SparkSession .builder().master("local").appName("ASH").getOrCreate();
                }
            });
        }
    
        private void setProperties() {
    
            final String yaml = System.getProperty("user.dir") + File.separator +"conf"+File.separator+"cassandra.yaml";
            final String storage = System.getProperty("user.dir") + File.separator +"storage" + File.separator +"data";
    
            System.setProperty("cassandra.config", "file:"+ yaml );
            System.setProperty("cassandra.storagedir", storage );
            System.setProperty("cassandra-foreground", "true");
    
            String log4JPropertyFile = "./conf/log4j.properties";
            Properties p = new Properties();
            try {
                p.load(new FileInputStream(log4JPropertyFile));
                PropertyConfigurator.configure(p);
            } catch (IOException e) {
                System.err.println("./conf/log4j.properties not found ");
                System.exit(1);
                ;
            }
        }
    }