有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java以编程方式将数据批量加载到HBase中的最快方法是什么?

我有一个可能有数百万行的纯文本文件,需要自定义解析,我希望尽快将其加载到HBase表中(使用Hadoop或HBase Java客户端)

我当前的解决方案基于不带Reduce部分的MapReduce作业。我使用FileInputFormat读取文本文件,以便将每一行传递给我的Mapper类的map方法。此时,解析该行以形成一个Put对象,该对象被写入context。然后,TableOutputFormat获取Put对象并将其插入表中

这个解决方案产生每秒1000行的平均插入速率,这比我预期的要低我的HBase设置在单个服务器上处于伪分布式模式

一件有趣的事情是,在插入1000000行的过程中,生成了25个映射器(任务),但它们连续运行(一个接一个);这正常吗

以下是我当前解决方案的代码:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

共 (2) 个答案

  1. # 1 楼答案

    我经历了一个可能与您非常相似的过程,即试图找到一种将数据从MR加载到HBase的有效方法。我发现有效的方法是使用HFileOutputFormat作为MR的OutputFormat类

    下面是我的代码的基础,我必须生成job和用于写出数据的Mapper map函数。这很快。我们不再使用它了,所以我手头没有数字,但不到一分钟就有250万条记录

    下面是我编写的(精简)函数,它为我的MapReduce进程生成作业,将数据放入HBase

    private Job createCubeJob(...) {
        //Build and Configure Job
        Job job = new Job(conf);
        job.setJobName(jobName);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
        job.setJarByClass(CubeBuilderDriver.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);
    
        TextInputFormat.setInputPaths(job, hiveOutputDir);
        HFileOutputFormat.setOutputPath(job, cubeOutputPath);
    
        Configuration hConf = HBaseConfiguration.create(conf);
        hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
        hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);
    
        HTable hTable = new HTable(hConf, tableName);
    
        HFileOutputFormat.configureIncrementalLoad(job, hTable);
        return job;
    }
    

    这是我在HiveToHBaseMapper类中使用的map函数(略经编辑)

    public void map(WritableComparable key, Writable val, Context context)
            throws IOException, InterruptedException {
        try{
            Configuration config = context.getConfiguration();
            String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
            String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
            String column = strs[COLUMN_INDEX];
            String Value = strs[VALUE_INDEX];
            String sKey = generateKey(strs, config);
            byte[] bKey = Bytes.toBytes(sKey);
            Put put = new Put(bKey);
            put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                            ? Bytes.toBytes(Double.MIN_VALUE)
                            : Bytes.toBytes(value));
    
            ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
            context.write(ibKey, put);
    
            context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
        }
        catch(Exception e){
            context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
        }
    
    }
    

    我很确定这不会是复制品;为您粘贴解决方案。显然,我在这里处理的数据不需要任何定制处理(这是在这之前的MR作业中完成的)。我想提供的主要内容是HFileOutputFormat。剩下的只是我如何使用它的一个例子。:)
    我希望它能让你找到一个好的解决方案

  2. # 2 楼答案

    One interesting thing is that during insertion of 1,000,000 rows, 25 Mappers (tasks) are spawned but they run serially (one after another); is this normal?

    mapreduce.tasktracker.map.tasks.maximum参数(默认为2)确定节点上可以并行运行的最大任务数。除非更改,否则每个节点上应该同时运行两个映射任务