java如何在执行Flux时处理错误。地图()
我试图找出在通量中映射元素时如何处理错误
例如,我正在将CSV字符串解析为我的一个业务POJO:
myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));
其中一些行可能包含错误,因此我在日志中得到的是:
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
reactor.core.publisher.FluxLog: onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
reactor.core.publisher.FluxLog: onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
reactor.core.publisher.FluxLog: java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo
我在API中读到了一些错误处理方法,但大多数是指返回“错误值”或使用回退流量,如以下方法:
Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);
但是,将其与mymyflux
一起使用意味着整个通量将被再次处理
那么,在处理特定元素(即忽略它们/记录它们)的同时,是否有一种方法来处理错误,并继续处理剩余的流量
使用@akarnokd解决方案更新
public Flux<StockQuotation> getQuotes(List<String> tickers)
{
Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
// Get each set of quotes in a separate thread
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
// Convert each list of raw quotes string in a new Flux<String>
.flatMap(list -> Flux.fromIterable(list))
// Convert the string to POJOs
.flatMap(x -> {
try {
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));
}
catch (IllegalArgumentException ex){
System.out.println("Error decoding stock quotation: " + x);
return Flux.empty();
}
});
return processingFlux;
}
然而,这是一种魅力,因为您可以看到代码没有以前那么优雅。Flux API没有任何方法来完成这段代码所做的事情吗
retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)
# 1 楼答案
# 2 楼答案
您需要
flatMap
,如果处理失败,则返回一个空序列:# 3 楼答案
如果想使用Reactor 3的方法处理异常,可以使用
Mono.fromCallable
不幸的是,没有
Flux.fromCallable
,所以假设callable返回一个列表,您必须手动将其转换为通量# 4 楼答案
在当前版本的反应堆3中,添加了很多方法。所以我们可以这样做:
有关如何处理错误的更多信息here
# 5 楼答案
你可以继续使用。 它允许通过删除故障元素并继续后续元素从错误中恢复