有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Spring WebFlux中的java背压机制

我是SpringWebFlux的新手。我编写了一个控制器,如下所示:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道被动的好处之一是背压,它可以平衡请求或响应速度。我想了解如何在弹簧卷筒纸流量中使用背压机制


共 (1) 个答案

  1. # 1 楼答案

    WebFlux中的背压

    为了理解背压在WebFlux框架的当前实现中是如何工作的,我们必须在这里回顾一下默认情况下使用的传输层。我们可能还记得,浏览器和服务器之间的正常通信(服务器到服务器的通信通常也一样)是通过TCP连接完成的。WebFlux还使用该传输在客户端和服务器之间进行通信。 然后,为了得到背压控制术语的含义,我们必须从反应流规范的角度回顾背压的含义

    The basic semantics define how the transmission of stream elements is regulated through back-pressure.

    因此,从该陈述中,我们可以得出结论,在反应流中,背压是一种机制,通过传输(通知)接收者可以消费多少元素来调节需求;这里我们有一个棘手的问题。TCP具有字节抽象,而不是逻辑元素抽象。我们通常所说的背压控制是指控制发送/接收到/来自网络的逻辑元件的数量。即使TCP有它自己的流控制(请参见含义here和动画there),此流控制仍然针对字节,而不是逻辑元素

    在WebFlux模块的当前实现中,背压由传输流控制进行调节,但它不暴露接收者的真实需求。为了最终看到交互流程,请参见下图:

    enter image description here

    为简单起见,上图显示了两个微服务之间的通信,其中左微服务发送数据流,右微服务使用该数据流。以下编号列表提供了该图的简要说明:

    1. 这是一个WebFlux框架,它适当地处理逻辑元素到字节的转换,以及向TCP(网络)传输/接收逻辑元素和从TCP(网络)接收逻辑元素
    2. 这是元素长时间运行处理的开始,一旦作业完成,该元素将请求下一个元素
    3. 在这里,虽然业务逻辑没有需求,但WebFlux会将来自网络的字节排队,而不进行确认(业务逻辑没有需求)
    4. 由于TCP流控制的性质,服务A仍然可以向网络发送数据

    正如我们从上图中注意到的,接收方公开的需求与发送方的需求不同(这里的需求是逻辑元素)。这意味着两者的需求是孤立的,仅适用于WebFlux<-&燃气轮机;业务逻辑(服务)交互,减少了服务A的背压<-&燃气轮机;服务B交互。所有这一切都意味着背压控制在WebFlux中并不像我们预期的那样公平

    所有这一切意味着,在WebFlux中,背压控制并不像我们预期的那样公平

    但我还是想知道如何控制背压

    如果我们仍然希望对WebFlux中的背压进行不公平的控制,我们可以在^{}等项目反应堆运营商的支持下这样做。以下示例显示了如何使用该运算符:

    @PostMapping("/tweets")
    public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
        
        return tweetService.process(tweetsFlux.limitRate(10))
                           .then();
    }
    

    正如我们从示例中看到的,limitRate()运算符允许定义一次预取的元素数。这意味着,即使最终订户请求Long.MAX_VALUE元素,limitRate操作符也会将该需求分割成块,并且不允许一次消耗超过该块的内容。我们可以对元素发送过程执行相同的操作:

    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
        
        return tweetService.retreiveAll()
                           .limitRate(10);
    }
    

    上面的示例显示,即使WebFlux一次请求超过10个元素,limitRate()也会将请求限制为预取大小,并防止一次消耗超过指定数量的元素

    另一种选择是实现自己的Subscriber或从projectreactor扩展BaseSubscriber。例如,下面是一个简单的例子,说明了我们如何做到这一点:

    class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
    
        int consumed;
        final int limit = 5;
    
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(limit);
        }
        
        @Override
        protected void hookOnNext(T value) {
            // do business logic there 
    
            consumed++;
            
            if (consumed == limit) {
                consumed = 0;
                
                request(limit);
            }
        }
    }
    

    采用RSocket协议的公平背压

    为了通过网络边界实现逻辑元素的背压,我们需要一个合适的协议。幸运的是,有一个叫做RScoket protocol。RSocket是一种应用程序级协议,允许通过网络边界传输实际需求。 该协议有一个rsocketjava实现,允许设置RSocket服务器。在服务器到服务器通信的情况下,同样的rsocketjava库也提供了客户机实现。要了解如何使用rsocketjava的更多信息,请参见以下示例here。 对于浏览器-服务器通信,有一个RSocket-JS实现,它允许通过WebSocket连接浏览器和服务器之间的流式通信

    RSocket之上的已知框架

    现在有一些框架是建立在RSocket协议之上的

    变形杆菌

    其中一个框架是一个Proteus项目,它提供构建在RSocket之上的成熟的微服务。此外,Proteus与Spring框架集成良好,因此现在我们可以实现公平的背压控制(参见示例there

    进一步阅读