有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在执行Flux时处理错误。地图()

我试图找出在通量中映射元素时如何处理错误

例如,我正在将CSV字符串解析为我的一个业务POJO:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

其中一些行可能包含错误,因此我在日志中得到的是:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

我在API中读到了一些错误处理方法,但大多数是指返回“错误值”或使用回退流量,如以下方法:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

但是,将其与mymyflux一起使用意味着整个通量将被再次处理

那么,在处理特定元素(即忽略它们/记录它们)的同时,是否有一种方法来处理错误,并继续处理剩余的流量

使用@akarnokd解决方案更新

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

然而,这是一种魅力,因为您可以看到代码没有以前那么优雅。Flux API没有任何方法来完成这段代码所做的事情吗

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)

共 (5) 个答案

  1. # 1 楼答案

    ...
           // Convert the string to POJOs
        .flatMap(x ->
            Flux.just(converter.convertHistoricalCSVToStockQuotation(x))
                .doOnError(IllegalArgumentException.class,
                    e -> System.out.println("Error decoding stock quotation: " + x))
                //.onErrorStop()
                .onErrorResume(IllegalArgumentException.class, e -> Flux.empty())
        )
    ...
    
  2. # 2 楼答案

    您需要flatMap,如果处理失败,则返回一个空序列:

    myflux.flatMap(v -> {
        try {
            return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
        } catch (IllegalArgumentException ex) {
            return Flux.empty();
        }
    });
    
  3. # 3 楼答案

    如果想使用Reactor 3的方法处理异常,可以使用Mono.fromCallable

    flatMap(x -> 
        Mono.fromCallable(() -> converter.convertHistoricalCSVToStockQuotation(x))
            .flux()
            .flatMap(Flux::fromIterable)
            .onErrorResume(Flux::empty)
    )
    

    不幸的是,没有Flux.fromCallable,所以假设callable返回一个列表,您必须手动将其转换为通量

  4. # 4 楼答案

    在当前版本的反应堆3中,添加了很多方法。所以我们可以这样做:

    Flux.onErrorResume(error -> { 
            System.out.println("Error decoding stock quotation: " + e);
            return Flux.empty();
        });
    

    有关如何处理错误的更多信息here

  5. # 5 楼答案

    你可以继续使用。 它允许通过删除故障元素并继续后续元素从错误中恢复