有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java在使用WebClient进行非阻塞REST调用时的显著故障率

我正在编写一个应用程序,扫描其中一家网店,并在那里寻找新的优惠。我使用的是官方REST API,每秒允许9000个请求(每个clientId),我被要求检查大约350个不同的查询,所以现在我每3秒调用一次这个API。我最初的方法是使用阻塞RestTemplate并生成多达350个线程,每个线程用于一个查询。它起作用了,但是我想尝试使用WebClient的非阻塞方法。在这里,问题开始增多

我的代码非常简单,这是我的非阻塞客户端

@Component
public class OffersListingReactiveClient {
    private final AuthenticationService authenticationService;
    private final WebClient webClient;

    public OffersListingReactiveClient(final AuthenticationService authenticationService, final WebClient webClient) {
        this.authenticationService = authenticationService;
        this.webClient = webClient;
    }

    public Mono<ApiResponse> collectData(final SearchQuery query) {
        final var accessToken = authenticationService.getAccessToken();
        return invoke(query.toUrlParams(), accessToken);
    }

    private Mono<ApiResponse> invoke(final String endpoint, final String accessToken) {
        return webClient.get()
                .uri(endpoint)
                .header("Authorization", accessToken)
                .retrieve()
                .bodyToMono(ApiResponse.class)
                .onErrorReturn(ApiResponse.emptyResponse());
    }
}

使用该客户机的服务的一部分,其余的代码是无关的,我想,我在几个不同的线程中用RestClient初始化initialItems,一旦数据初始化,我发布携带该数据的事件,从那一刻起,我开始用下面的代码收集新的项目

@Scheduled(fixedRate = 3000)
private void collectData() {
    if (!initialItems.isEmpty()) {
        try {
            queryRepository.getSearchQueries()
                    .forEach(query -> reactiveClient.collectData(query)
                            .subscribe(response -> {
                                this.processResponse(response);
                                this.logResults(response);
                            }));
        } catch (Throwable t) {
            logger.severe("Throwable " + t.getMessage());
        }
    }
}

private void processResponse(final ApiResponse response) {
    response.collectItems().forEach(item -> {
        if (!initialItems.containsKey(item.getId())) {
            initialItems.put(item.getId(), item);
            eventPublisher.publishEvent(new NewItemEvent(this, item));
        }
    });
}

private void logResults(final ApiResponse response) {
    if (response.isFailed()) {
        failRate.incrementAndGet();
    } else {
        successRate.incrementAndGet();
    }
    logger.info("Success rate: " + successRate + ", Failure rate: " + failRate);
}

正如您所看到的,我添加了logResults()方法来简单地查看有多少请求失败,这个数字是巨大的

每3秒钟有350个请求,超过70%的请求失败
每3秒有175个请求,成功/失败率约为50%
在每3秒80个请求的情况下,失败率约为10%
每3秒有40个请求,约为1%

为什么?在40/80的请求中,我得到了这个例外

2020-10-12 20:19:08.345 WARN 5676 --- [ctor-http-nio-7] r.netty.http.client.HttpClientConnect : [id: 0xedf175d7, L:/192.168.1.11:62743 ! R:api.pl/xxxxx:443] The connection observed an error

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

一旦我添加了更多的查询,意味着更多的请求,那么我也会得到这个

io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062) ~[netty-handler-4.1.52.Final.jar:4.1.52.Final] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Request to GET https://api.pl/offers/listing?phrase=ryzen&include=-all&include=items&sort=-startTime&category.id=42540aec-367a-4e5e-b411-17c09b08e41f [DefaultWebClient] Stack trace: at io.netty.handler.ssl.SslHandler$5.run(SslHandler.java:2062) ~[netty-handler-4.1.52.Final.jar:4.1.52.Final] at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) ~[netty-common-4.1.52.Final.jar:4.1.52.Final] at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170) ~[netty-common-4.1.52.Final.jar:4.1.52.Final]

编辑#

我正在添加依赖项

<properties>
    <java.version>14</java.version>
</properties>

<parent>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>   
</dependencies>

共 (0) 个答案