有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java是编写大量文件的最佳方式

我写了很多像贝娄这样的文件

public void call(Iterator<Tuple2<Text, BytesWritable>> arg0)
        throws Exception {
    // TODO Auto-generated method stub

    while (arg0.hasNext()) {
        Tuple2<Text, BytesWritable> tuple2 = arg0.next();
        System.out.println(tuple2._1().toString());
        PrintWriter writer = new PrintWriter("/home/suv/junk/sparkOutPut/"+tuple2._1().toString(), "UTF-8");
        writer.println(new String(tuple2._2().getBytes()));
        writer.close();
    }
}

有没有更好的方法来写这些文件。。每次都不关闭或创建printwriter


共 (1) 个答案

  1. # 1 楼答案

    我想你是在追求最快的方式。因为每个人都知道最快就是最好的;)

    一个简单的方法是使用一堆线程来为你写作。 然而,除非文件系统具有良好的可扩展性,否则这样做不会带来太多好处。(我在基于Luster的集群系统上使用了这种技术,如果“很多文件”可能意味着10k,那么在这种情况下,许多写入操作将进入不同的服务器/磁盘)

    代码看起来是这样的:(注意,我认为这个版本不适合少量的文件,因为这会填满工作队列——但请参阅下一个版本以获得更好的版本……)

    public void call(Iterator<Tuple2<Text, BytesWritable>> arg0) throws Exception {
        int nThreads=5;
        ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
        ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
    
        int nJobs = 0;
    
        while (arg0.hasNext()) {
            ++nJobs;
            final Tuple2<Text, BytesWritable> tuple2 = arg0.next();
            ecs.submit(new Callable<Void>() {
              @Override Void call() {
                 System.out.println(tuple2._1().toString());
                 String path = "/home/suv/junk/sparkOutPut/"+tuple2._1().toString();
                 try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
                   writer.println(new String(tuple2._2().getBytes()))
                 }
                 return null;
              }
           });
        }
        for(int i=0; i<nJobs; ++i) {
           ecs.take().get();
        }
    }
    

    更好的方法是,在第一个文件有数据时就开始编写文件,而不是在所有文件都有数据时,并且这样编写不会阻塞计算线程

    要做到这一点,您需要将应用程序分成几个部分,通过(线程安全的)队列进行通信

    代码最终看起来更像这样:

    public void main() {
      SomeMultithreadedQueue<Data> queue = ...;
    
      int nGeneratorThreads=1;
      int nWriterThreads=5;
      int nThreads = nGeneratorThreads + nWriterThreads;
    
      ExecutorService threadPool = Executors.newFixedThreadPool(nThreads);
      ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
    
      AtomicInteger completedGenerators = new AtomicInteger(0);
    
      // Start some generator threads.
      for(int i=0; ++i; i<nGeneratorThreads) {
        ecs.submit( () -> { 
          while(...) { 
            Data d = ... ;
            queue.push(d);
          }
          if(completedGenerators.incrementAndGet()==nGeneratorThreads) {
            queue.push(null);
          }
          return null;
       });
      }
    
      // Start some writer threads
      for(int i=0; i<nWriterThreads; ++i) {
        ecs.submit( () -> { 
          Data d
          while((d = queue.take())!=null) {
            String path = data.path();
            try(PrintWriter writer = new PrintWriter(path, "UTF-8") ) {
               writer.println(new String(data.getBytes()));
            }
            return null;
          }
        });
      }
    
      for(int i=0; i<nThreads; ++i) {
        ecs.take().get();
      }
    }
    

    注意,我没有提供队列类的实现,您可以轻松地包装标准java线程安全类,以获得所需的内容

    还有很多事情可以做,以减少延迟,等等-以下是我用来降低时间的一些进一步的事情

    1. 甚至不要等待为给定文件生成所有数据。传递另一个包含要写入的字节包的队列

    2. 注意分配——你可以重用一些缓冲区

    3. nio中有一些延迟——通过使用C写操作、JNI和直接缓冲区,可以获得一些性能改进

    4. 线程切换可能会造成伤害,队列中的延迟可能会造成伤害,因此您可能需要稍微批量处理数据。用1来平衡这一点可能很棘手