有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java中的多线程非阻塞缓冲区

在大容量多线程java项目中,我需要实现一个非阻塞缓冲区

在我的场景中,我有一个web层,每秒接收约20000个请求。我需要在某些数据结构(也称为所需的缓冲区)中累积一些请求,当它已满(假设它包含1000个对象时已满)时,这些对象应序列化为一个文件,并发送到另一台服务器进行进一步处理

实现应该是非阻塞的。 我检查过ConcurrentLinkedQueue,但我不确定它是否适合这份工作

我认为我需要使用两个队列,一旦第一个队列被填满,它就会被一个新的队列所取代,而完整的队列(“第一个”)将被交付以供进一步处理。这是我目前正在考虑的基本想法,但我仍然不知道这是否可行,因为我不确定我能否在java中切换指针(以便切换整个队列)

有什么建议吗

谢谢


共 (5) 个答案

  1. # 1 楼答案

    我可能弄错了什么,但是您可以使用ArrayList来实现这一点,因为您不需要从队列中轮询每个元素。当阵列的大小达到限制并需要发送时,您只需在同步部分刷新(创建副本并清除)阵列。添加到此列表还应同步到此刷新操作

    交换阵列可能不安全-如果发送速度比生成速度慢,缓冲区可能很快开始相互覆盖。每秒20000个元素的数组分配对于GC来说几乎是零

    Object lock  = new Object();
    
    List list = ...;
    
    synchronized(lock){
        list.add();
    }
    
    ...
    
    // this check outside is a quick dirty check for performance, 
    // it's not valid out of the sync block
    // this first check is less than nano-second and will filter out 99.9%
    // `synchronized(lock)` sections
    if(list.size() > 1000){
      synchronized(lock){  // this should be less than a microsecond
         if(list.size() > 1000){  // this one is valid
           // make sure this is async (i.e. saved in a separate thread) or <1ms
           // new array allocation must be the slowest part here
           sendAsyncInASeparateThread(new ArrayList(list)); 
           list.clear();
         }
      }
    }
    

    更新

    考虑到发送是异步的,这里最慢的部分是new ArrayList(list),1000个元素的发送速度应为1微秒左右,每秒20微秒。我没有测量这个,我是根据在~1毫秒内分配100万个元素的比例来解决这个问题的

    如果您仍然需要一个超快速同步队列,那么您可能需要查看一下MentaQueue

  2. # 2 楼答案

    对于这样的需求,我通常会在应用程序启动时创建一个缓冲池,并将引用存储在BlockingQueue中。生产者线程弹出缓冲区,填充缓冲区,然后将引用推送到消费者正在等待的另一个队列。当消费者完成时(在您的情况下,数据写入到fine),ref会被推回到池队列中以供重用。这提供了大量的缓冲区存储,无需在锁内进行昂贵的大容量复制,消除了GC操作,提供了流控制(如果池清空,生产者必须等待一些缓冲区返回),并防止内存失控,这一切都在一个设计中实现

    更多:我已经在其他各种语言(C++、Delphi)中使用了这样的设计很多年,而且效果很好。我有一个包含BlockingQueue的“ObjectPool”类和一个派生缓冲区的“PooledObject”类。PooledObject有一个对其池的内部私有引用(在创建池时初始化),因此允许使用无参数的release()方法。这意味着,在具有多个池的复杂设计中,缓冲区总是被释放到正确的池中,从而降低了阻塞的可能性

    我的大多数应用程序都有GUI,所以我通常每秒都会将池级别转储到计时器上的状态栏。然后,我可以大致看到有多少负载,如果有缓冲区泄漏,(数量持续下降,然后应用程序最终在空池上死锁),或者我正在双重释放(数量持续上升,应用程序最终崩溃)

    在运行时更改缓冲区的数量也相当容易,方法是创建更多的缓冲区并将其推入池中,或者等待池,移除缓冲区并让GC销毁它们

  3. # 3 楼答案

    你说的“切换指针”是什么意思?Java中没有指针(除非您谈论的是引用)

    无论如何,正如您可能从Javadoc中看到的,ConcurrentLinkedQueue的size()方法有一个“问题”。尽管如此,您仍然可以使用您最初的想法,即使用2个(或更多)缓冲区进行切换。磁盘I/O可能会出现一些瓶颈。可能大小的非恒定时间()在这里也不会成为问题

    当然,如果您希望它是非阻塞的,您最好有大量内存和快速磁盘(以及大/更大的缓冲区)

  4. # 4 楼答案

    不要将每个请求对象放在队列中,而是分配一个大小为1000的数组,当数组被填满时,将该数组放在队列中给发送者线程,发送者线程序列化并发送整个数组。然后分配另一个数组

    当发送方不能足够快地工作并且其队列溢出时,您将如何处理这种情况?要避免内存不足错误,请使用大小有限的队列

  5. # 5 楼答案

    我认为你的解决方案很有道理。您将需要两个队列,processingQueue将是您想要的缓冲区大小(在您的示例中为1000),而waitingQueue将大得多。每当processingQueue满时,它会将其内容放入指定的文件中,然后从waitingQueue中抓取前1000个(如果等待队列少于1000个,则抓取的数量更少)

    我唯一关心的是你提到每秒20000次和1000次缓冲。我知道1000就是一个例子,但是如果你不把它做得更大,那可能只是你把问题转移到了waitingQueue而不是解决它,因为你的waitingQueue会比processingQueue更快地接收到1000个新的问题,给你的waitingQueue带来缓冲区溢出