java Cassandra hector loader应用程序内存不足
这个简单的应用程序将一个带有标题的逗号delim文件放入Cassandra。 它适用于小文件,但是内存会增加,直到内存不足异常将其杀死
我错过了什么
package com.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
public class QuickLoad {
public static Keyspace keyspace = null;
public static void main(String[] args) {
File file = new File(args[0]);
String keyspaceName = args[1];
String columnFamilyName = args[2];
BufferedReader reader = null;
try {
keyspace = GetKeyspace(keyspaceName);
reader = new BufferedReader(new FileReader(file));
String fileLine = null;
String[] headers = null;
String[] fields = null;
boolean headerLine = true;
while ((fileLine = reader.readLine()) != null) {
if (headerLine){
headerLine = false;
headers = fileLine.substring(1, fileLine.length()-1).split("\",\"");
} else {
fields = fileLine.substring(1, fileLine.length()-1).split("\",\"");
CassandraSave(keyspace, columnFamilyName, headers, fields);
}
}
}
catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.exit(0);
}
public static void CassandraSave(Keyspace keyspace, String columnFamily, String[] headers, String[] columns)
{
try
{
Mutator mutator = HFactory.createMutator(keyspace, StringSerializer.get());
for (int i = 1; i < headers.length-1; i++)
{
if ((columns[i] != null) || (!columns[i].equals("null"))) {
if (columns[i].length() > 0) {
HColumn<String, String> col = HFactory.createStringColumn(headers[i], columns[i]);
mutator.insert(columns[1], columnFamily, col);
}
}
}
mutator.execute();
} catch (Exception e){
e.printStackTrace();
}
}
public static Keyspace GetKeyspace(String keyspaceName)
{
String serverAddress = "localhost:9160";
Cluster cluster = HFactory.getOrCreateCluster("My Cluster", serverAddress);
Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster);
return keyspace;
}
}
# 1 楼答案
我看到两件事——它是单线程的,而且批量非常小
添加一个外部循环以批量收集mutator中的插入 大约500行的大小开始,看看如何进行。 下面是我用于压力测试的performant mutator insert示例: https://github.com/zznate/cassandra-stress/blob/master/src/main/java/com/riptano/cassandra/stress/InsertCommand.java
此外,它有点老,但这里是一个解决问题的方法要点 并行加载程序的工作原理与您描述的类似: https://gist.github.com/397574