有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

通过SSE订阅的java Flux引发取消()事件

我有一个弹簧靴2.0.0。M7+Spring Webflux应用程序,其中我使用的是Thymeleaf Reactive

我注意到,在我的微服务上,当我在SSE模式(文本/事件流)下调用返回数据流的端点时,即使已正确处理该数据流,也会在该数据流上发生cancel()

例如,这里有一个简单的控制器端点:

@GetMapping(value = "/posts")
public Flux<String> getCommunityPosts() {
    return Flux.just("A", "B", "C").log("POSTS");
}

以下是我在SSE模式下请求时获得的订阅流量日志:

2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
2018-02-13 17:04:09.841  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.842  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(A)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.847  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(B)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.848  INFO 4281 --- [nio-9090-exec-4] POSTS : | onNext(C)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | request(1)
2018-02-13 17:04:09.849  INFO 4281 --- [nio-9090-exec-4] POSTS : | onComplete()
2018-02-13 17:04:09.852  INFO 4281 --- [nio-9090-exec-4] POSTS : | cancel()

我们可以在onComplete之后注意到cancel事件。当我通过经典的GET请求调用同一个端点时,我没有这种行为。我怀疑这个取消事件会使客户端事件源(javascript)抛出OneError事件

这是SSE特有的已知/通缉行为吗

问题更新

实际上,我在一些流上使用SSE,因为有时我需要事件源来获取JSON数据,而不是Thymeleaf已经处理过的HTML。我应该换一种方式吗

我的实现基于这个示例的最后一个方法:https://github.com/danielfernandez/reactive-matchday/blob/master/src/main/java/com/github/danielfernandez/matchday/web/controller/MatchController.java

然而,我可能没有在之前的帖子中提供一些信息。我使用Tomcat服务器(带有M7的8.5.23),而不是Netty服务器。我强制使用Tomcat,包括以下Maven依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

在一个示例项目中使用您的代码,这似乎会导致问题

当我在Netty服务器上运行代码时,得到的结果与您相同:

2018-02-14 12:30:48.713  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:30:48.714  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(1)
2018-02-14 12:30:49.717  INFO 3060 --- [     parallel-2] reactor.Flux.ConcatMap.1 : onNext(a)
2018-02-14 12:30:49.739  INFO 3060 --- [ctor-http-nio-2] reactor.Flux.ConcatMap.1 : request(31)
2018-02-14 12:30:50.731  INFO 3060 --- [     parallel-3] reactor.Flux.ConcatMap.1 : onNext(b)
2018-02-14 12:30:51.733  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onNext(c)
2018-02-14 12:30:51.735  INFO 3060 --- [     parallel-4] reactor.Flux.ConcatMap.1 : onComplete()

当我在Tomcat服务器上运行相同的代码时,我遇到了取消问题:

2018-02-14 12:33:18.294  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
2018-02-14 12:33:18.295  INFO 3088 --- [nio-8080-exec-3] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:19.295  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : onNext(a)
2018-02-14 12:33:19.297  INFO 3088 --- [     parallel-4] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : onNext(b)
2018-02-14 12:33:20.302  INFO 3088 --- [     parallel-5] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onNext(c)
2018-02-14 12:33:21.306  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : request(1)
2018-02-14 12:33:21.307  INFO 3088 --- [     parallel-6] reactor.Flux.ConcatMap.2 : onComplete()
2018-02-14 12:33:21.307  INFO 3088 --- [nio-8080-exec-4] reactor.Flux.ConcatMap.2 : cancel()

是Tomcat的问题还是我做错了什么


共 (1) 个答案

  1. # 1 楼答案

    首先,我认为不应该对有限流使用SSE

    当我创建一个控制器方法时,比如:

    @GetMapping(path = "/test", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<String> test() {
        return Flux.just("a", "b", "c").delayElements(Duration.ofSeconds(1)).log();
    }
    

    并通过以下浏览器(Chrome或Firefox)请求:

    <script type="text/javascript">
        var testEventSource = new EventSource("/test");
        testEventSource.onmessage = function (e) {
            console.log(e);
        };
    </script>
    

    我在服务器上获得以下日志:

    | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
    | request(1)
    | onNext(a)
    | request(31)
    | onNext(b)
    | onNext(c)
    | onComplete()
    | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
    | request(1)
    | onNext(a)
    | request(31)
    | onNext(b)
    | onNext(c)
    | onComplete()
    

    一旦Flux完成,服务器就会关闭连接,浏览器会自动重新连接。这将一次又一次地重播相同的序列

    在服务器上获取cancel()事件的唯一方法是在流期间关闭浏览器选项卡