有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在并行flux内的同一线程中运行mono

我试图用Mono的值填充通量中的对象。当我尝试这样做时,它只是忽略了我的“set”操作。我假设这是因为通量是并行工作的,而Mono不是。我怎样才能解决这个问题

Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .filter(proxy -> proxy.getCorrupted() == null || !proxy.getCorrupted())
            .subscribe(proxy -> {
                        try {
                            RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                            restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                            geoDataService.getData(proxy.getHost()) // Here comes the Mono object, that contains needed value to set into "proxy"
                                    .subscribe(geoData ->
                                    {
                                        log.info("GEODATA: {} ", geoData);
                                        proxy.setCountryCode(geoData.getCountryCode()); // ignored somehow
                                    });
                            proxy.setCorrupted(false);
                            addresses.add(proxy);
                            log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                            log.info("Final result: {}", proxy.toString());
                        } catch (ResourceAccessException e) {
                            log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                            proxy.setCorrupted(true);
                            addresses.add(proxy);
                        }
                    },
                    throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));

}

Here's some logs

正如你们所看到的,我正试图将国家代码设置为代理


共 (1) 个答案

  1. # 1 楼答案

    解决了。在“flatMap”操作符中添加了Mono对象。 例如:

    Flux.fromIterable(proxyParserService.getProxyList())
                .parallel()
                .runOn(Schedulers.parallel())
                .filter(poxy -> !valueExist(addresses.values(), poxy))
                .flatMap(geoDataService::getData) // Now it runs in parallel threads
                .subscribe(proxy -> {
                            try {
                                RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                                restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                                proxy.setCorrupted(false);
                                addresses.put(proxy.getCountryCode(), proxy);
                                log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                                log.info("Final result: {}", proxy.toString());
                            } catch (ResourceAccessException e) {
                                log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                                proxy.setCorrupted(true);
                                addresses.put(proxy.getCountryCode(), proxy);
                            }
                        },
                        throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));