`OneErrorContinue(…)`的java限制在变化中?
我有一个(可能是无限的)Flux
源,它应该首先存储每条消息(例如,存储到数据库中),然后异步转发消息(例如,使用Spring WebClient
)
发生故障时的转发应该记录错误,而不完成源Flux
然而,我意识到流中的forward(flatMap(...)
)会在导致异常(例如reactor.retry.RetryExhaustedException
)的256条消息之后阻止源Flux
的执行
由于仅处理256条消息,因此断言失败的典型示例:
@Test
@SneakyThrows
public void sourceBlockAfter256Exceptions() {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
Flux
.fromStream(IntStream.range(0, numberOfRequests).boxed())
.map(sink::add)
.flatMap(i -> Mono
// normally the forwards are contained here e.g. by means of Mono.when(...).thenReturn(...).retryWhen(...):
.error(new Exception("any"))
)
.onErrorContinue((throwable, o) -> log.error("Error", throwable))
.subscribe();
Thread.sleep(3000);
Assertions.assertEquals(numberOfRequests, sink.size());
}
在subscribe(...)
内进行转发不会阻塞源Flux
,但这肯定不是解决方案,因为我不可能希望丢失消息
问题:
- 这里发生了什么事?(可能与存储在一位中的某些状态有关)
- 如何才能正确地执行此操作李>
编辑:
根据下面的讨论,我构建了一个使用FluxMessageChannel
的示例(据我所知,该示例适用于无限流,并且在256个错误后肯定不会阻塞),并且具有完全相同的行为:
@Test
@SneakyThrows
public void maxConnectionWithChannelTest() {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
FluxMessageChannel fluxMessageChannel = MessageChannels.flux().get();
fluxMessageChannel.subscribeTo(
Flux
.fromStream(IntStream
.range(0, numberOfRequests).boxed()
.map(i -> MessageBuilder.withPayload(i).build())
)
.map(Message::getPayload)
.map(sink::add)
.flatMap(i -> Mono.error(new Exception("whatever")))
);
Flux
.from(fluxMessageChannel)
.subscribe();
Thread.sleep(3000);
Assert.assertEquals(numberOfRequests, sink.size());
}
编辑:
我刚刚在反应堆核心项目中提出了一个问题:https://github.com/reactor/reactor-core/issues/2011
共 (0) 个答案