有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

查询参数递增的java重复WebClient调用

我试图构建一个方法,当一个查询参数小于45000时,该方法应该对一个外部端点执行多个HTTP请求

我需要这样做,因为外部端点允许我获取100个项目,但要获取的项目超过44000个

private int offset = 0;

public Flux<List<Model>> getItems() {
    return Flux.from(
            webClientBuilder
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/getItems")
                            .queryParam("limit", 100)
                            .queryParam("offset", getOffset())
                            .build())
                    .retrieve()
                    .bodyToMono(Model.class)
                    .doOnSuccess(System.out::println)
                    .flatMap(model -> {
                        setOffset(getOffset() + 100);
                        log.info("Offset: " + getOffset());
                        return repository.saveAll(model.getData().getResults()).collectList();
                    }).delayElement(Duration.ofSeconds(15)))
                    .repeat(() -> getOffset() <= 45000);
}

public int getOffset() {
    return offset;
}

public void setOffset(int offset) {
    this.offset = offset;
}

这似乎有效,因为日志偏移量参数增加了,但HTTP请求的偏移量等于0。该方法返回前100项,而不是44566项


共 (1) 个答案

  1. # 1 楼答案

    问题是,实际上,webclient是在订阅之前急切地构建的,并且使用初始offset值进行“缓存”。在每次调用之后,Flux都会被重新订阅,但准备好的带有偏移量的webservice调用仍会被“缓存”。 您必须以惰性方式提供weblient(例如,将其包装为lambda),这将强制其所有参数为每个调用重新计算。有一个特殊的操作符-defer()

    解决方案

    Mono<Model> response = Mono.defer(() -> webClientBuilder
            .build()
            .get()
            .uri(uriBuilder -> uriBuilder
                    .path("/getItems")
                    .queryParam("limit", 100)
                    .queryParam("offset", getOffset())
                    .build())
            .retrieve()
            .bodyToMono(Model.class)
    );
    
    
    Flux.from(response
            .doOnEach(System.out::println)
            .flatMap(model -> {
                setOffset(getOffset() + 100);
                log.info("Offset: " + getOffset());
                return repository.saveAll(model.getData().getResults()).collectList();
            }).delayElement(Duration.ofSeconds(15))
    ).repeat(() -> getOffset() <= 45000).subscribe();
    

    另一个问题表明了渴望执行的同样问题: Mono switchIfEmpty() is always called