有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java LinkedBlockingQueue。take似乎一直很忙

我正在尝试在OSGi中实现一个服务,OSGi应该等待来自另一个bundle的传入数据,并在接收数据时处理数据。 我使用的是LinkedBlockingQueue,因为我不知道将接收多少数据包。 我的代码如下所示:

public class MyClass {

protected static LinkedBlockingQueue<JSONObject> inputQueue = new LinkedBlockingQueue<JSONObject>();
private ExecutorService workerpool = Executors.newFixedThreadPool(4);

public void startMyBundle() {
    start();
}

protected void start() {
    new Thread(new Runnable() {
        public void run() {
            while(true){
                workerpool.execute(new Runnable() {
                    public void run() {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            System.out.println("thread was interrupted.");
                        }
                    }
                });
            }
        }
    }).start();
}

public void transmitIn(JSONObject packet) {
    try {
        inputQueue.put(packet);
    } catch (InterruptedException e) {

    }
}

protected  void process(JSONObject packet) {
    //Some processing
}

当我运行这个程序,并且只向服务发送一个数据包时,数据包首先被处理,但是我的处理器使用了所有的容量,大多数时候我得到一个OutOfMemoryError如下所示:

java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "[Timer] - Periodical Task (Bundle 46) (Bundle 46)"

这可能是什么原因


共 (3) 个答案

  1. # 1 楼答案

    由于以下代码行,您将出现内存不足异常:

    while(true){
       workerpool.execute(new Runnable() {
       ...
    

    这将永远旋转创建新的Runnable实例,并将它们添加到线程池的任务队列中。它们进入一个无限的队列并迅速填满内存

    我想你需要一个在while (true)循环中调用inputQueue.take()的4个线程

    for (int i = 0; i < 4; i++) {
        workerpool.execute(new Runnable() {
            public void run() {
                while (!Thread.currentThread().isInterrupted()) {
                    process(inputQueue.take());
                }
            }
        });
    }
    // remember to shut the queue down after you've submitted the last job
    workerpool.shutdown();
    

    此外,将任务提交到线程池中不需要Thread。这是一个非阻塞操作,因此可以由调用方直接完成

  2. # 2 楼答案

    这段代码就是罪魁祸首:

    protected void start() {
        new Thread(new Runnable() {
            public void run() {
                while(true){
                    workerpool.execute(new Runnable() {
                        public void run() {
                            try {
                                process(inputQueue.take());
                            } catch (InterruptedException e) {
                                System.out.println("thread was interrupted.");
                            }
                        }
                    });
                }
            }
        }).start();
    }
    

    它所做的是创建一个后台任务,将无限多的Runnable添加到ExecutorService工作队列中。这最终导致了一个OOME

    我想你想做的是:

    protected void start() {
        for (int i = 0; i < 4; ++i) {
            workerpool.execute(new Runnable() {
                public void run() {
                    while (true) {
                        try {
                            process(inputQueue.take());
                        } catch (InterruptedException e) {
                            //killed, exit.
                            return;
                        }
                    }
                }
            });
        }
    }
    

    例如,在ExecutorService上运行4个等待输入的工作进程

  3. # 3 楼答案

    好吧,有点迂腐,但既然这是一个OSGi标记的问题

    1. 清理——你正在创建一个线程和执行器服务,但从未清理过。通常,您需要一对激活/停用方法,并且在停用后不会留下任何剩余。从内聚性的角度来看,您希望在一个对象中看到这一点,而不需要一个中心点来管理这一点。声明式服务非常适合这种模式
    2. 分享——一般来说,你想和其他人分享一个执行者,it is best to get an Executor from the service registry。这将允许部署人员根据系统中所有捆绑包的使用情况调整线程数

    还有一件事,Boris给出了一个正确的解决方案,但它不是很有效,因为它总是占用4个线程和一个无限的LinkedQueue。更糟糕的是,代码像服务一样运行,像服务一样说话,但似乎没有被用作服务。我认为我们可以做得更好,因为队列+执行器是双重的,在OSGi中,这应该是一种服务

    @Component
    public class JSONPackageProcessor implement TransmitIn {
      Executor executor;
    
      @Reference void setExecutor(Executor e) { this.executor = e; }
    
      public void transmitIn( final JSONPacket packet ) {
        executor.execute(new Runnable() {
           public void run() { 
             try { process(packet); } 
             catch(Throwable t) { log(packet, t); }
           }
        }
      }
    
      void process( JSONPacket packet ) { ... }
    }
    

    假设process(...)总是“很快”结束,则无需清理。在这个模型中,流量不会像您使用(任意?)时那样被限制池中有4个工作线程。执行器的内部队列用于缓冲。您可以将其限制如下:

      Semaphore throttle= new Semaphore(4)
    
      public void transmitIn( final JSONPacket packet ) throws Exception {
        throttle.acquire();
        executor.execute(new Runnable() {
           public void run() { 
             try { process(packet); } 
             catch(Throwable t) { log(packet, t); }
             finally { throttle.release(); }
        }
      }
    

    您甚至可以通过Configuration Admin轻松配置:

     @Activate void configure( Map<String,Object> map) throws Exception {
       if ( map.containsKey("throttle") )
         throttle = new Semaphore( map.get("throttle"));
     }
    

    这段代码的美妙之处在于,大多数错误情况都会被记录下来,前后并发关系是正确的,因为在OSGi中得到的保证是正确的。这段代码实际上可以按原样工作(不保证某些拼写错误,实际上还没有运行)