有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java从flux的开始捕获元素,并创建一个新的flux,其中包含捕获的元素和剩余的元素

我正在从事一个基于SpringCloudGateway的项目,我的目标是捕获和记录部分传入和传出的消息。请求日志记录必须在请求传递到后端服务之前完成,并且相同的策略应用于响应。实现应该基于过滤器。我无法控制网关何时订阅产生的流量

简而言之,我想做以下几点:

  1. 从flux捕获最多x字节的数据
  2. 记录捕获的数据
  3. 创建包含捕获数据和剩余数据的流量

到目前为止,我得到了这个-它似乎是工作。我只是想知道,我是否错过了一些东西和/或是否有更好的方法来实现这一点。我相信其他人也遇到过类似的问题:

Flux<Integer> body = Flux.range(1, 50).log(); // Simulate flow of data
ConnectableFlux<Integer> sharedBody = body.publish(1); // Content is already buffered - ideal prefetch would be 0
AtomicLong readCount = new AtomicLong(); // Counter
AtomicReference<Flux<Integer>> partiallyCachedFlux = new AtomicReference<>(); // A hack, not to be used in real world

Flux.from(sharedBody)
    .takeUntil(s -> {
        System.out.println("C:" + s);
        return readCount.incrementAndGet() >= 10; // Store up to 10 elements
    })
    .collectList()
    .subscribe(ints -> {
        System.out.println("Collected:" + ints); // Log what we got
        partiallyCachedFlux.set(
                Flux.concat(Flux.fromIterable(ints).log(), sharedBody)
                ); // Create a flux that contains captured data and remaining data
    });
sharedBody.connect();

Thread.sleep(1000); // Because I was lazy

partiallyCachedFlux.get()
    .doOnEach(i -> { if (i.isOnNext()) System.out.println("P:" + i.get());})
    .subscribe(); // Show that we have captured everything

共 (1) 个答案

  1. # 1 楼答案

    takeUntil相反的是skipUntil。你可以share将原始通量分成两个通量,一个是takesUntil,另一个是skipsUntil。您的最终结果将是两种流量的Flux.merge

    注意,当像这样外部化状态(AtomicInteger)时,如果整个Flux被多次订阅,您将遇到问题。解决这个问题的方法是将所有内容包装到Flux.defer中,以便在lambda中创建外部状态,从而特定于给定订阅