有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用BlockingQueue的

我试图在代码中实现队列的使用。关键是,我希望它打印出文件中的总字数,这意味着我需要它在完成所有结果时将它们相加

目前,我的程序所做的是,我有一个读卡器,它运行文件,并返回一个字符串,其中包含文件名和其中的字数。然后,我使用main方法为args数组中给出的每个参数运行for循环。每次我们通过一个新的文档来检查有多少单词,我们就把它变成一个新的线程

public static void main(final String[] args) {
    Thread t = null;
    if (args.length >= 1) {
        String destinationFileName = args[(args.length-1)];
            for (int l = 0; l < (args.length); l++) {
                final int q = l;
                final Thread y = t;
                Runnable r = new Runnable() {
                    public void run() {
                        String res = readTextFile(args[q]);
                        System.out.println(res);
                    }
                };
                t = new Thread(r);
                t.start();
            }
    } else {
        System.err.println("Not enough input files");
    }
}

那么,我该如何创建一个队列,让它们以某种方式等待彼此,这样就不会犯在同一时间添加结果的错误呢


共 (2) 个答案

  1. # 1 楼答案

    这里似乎没有必要排队。只需让每个线程将其结果添加到线程安全列表中,该列表可以如下构造:

    final List<String> results = 
            Collections.synchronizedList(new ArrayList<String>());
    

    接下来,在聚合结果之前,需要等待所有线程完成。可以通过在每个线程上调用join来实现这一点。将每个线程添加到名为threads的列表中,然后在所有线程启动后,调用以下命令:

    for(Thread t : threads) {
        t.join();
    }
    

    这段代码将有效地等待每个线程完成后再继续

  2. # 2 楼答案

    这是一个非常常见的例子,当需要多个线程来处理IO操作,比如从磁盘读取文件时,我想这是为了指导,对于现实生活的例子,考虑查看Hadoop

    的Map Reduce框架。

    查看如何使用hadoop here完成类似任务

    然而,下面是一个伪例子:

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    class ConsumerProducer implements Runnable {
        private final BlockingQueue<String> map;
        private final BlockingQueue<Map<String, Integer>> reduce;
    
        ConsumerProducer(BlockingQueue<String> map,
                BlockingQueue<Map<String, Integer>> reduce) {
            this.map = map;
            this.reduce = reduce;
        }
    
        public void run() {
            try {
                while (true) {
                    Map<String, Integer> wordToOccurrences = this.consume(map
                            .take());
                    this.produce(wordToOccurrences);
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        private void produce(Map<String, Integer> wordToOccurrences)
                throws InterruptedException {
            reduce.put(wordToOccurrences);
        }
    
        public Map<String, Integer> consume(String fileName) {
            // read the file and return 'word' -> number of occurrences
            return new HashMap<String, Integer>();
        }
    }
    
    class Setup {
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> map = new LinkedBlockingQueue<String>();
            BlockingQueue<Map<String, Integer>> reduce = new LinkedBlockingQueue<Map<String, Integer>>();
    
            for (String fileName : args) {
                map.put(fileName);
                // assuming every thread process single file, for other options see
                // http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
                ConsumerProducer c = new ConsumerProducer(map, reduce);
                new Thread(c).start();
            }
    
            for (int i = 0; i < args.length; i++) {
                Map<String, Integer> wordToOccurrences = reduce.take();
                // start consuming results
            }
    
            // print merged map of total results
        }
    }