JavaSpringFramework。数据存储库。反应性。反应性积存。save()在从CompletableFuture调用时不持久化数据。运行异步
上下文:我想将数据异步保存到MongoDb。当Front(例如Angular/Mobile)调用控制器端点时,它将调用一个服务方法,目的是首先将数据保存到MongoDb,如果成功,将相同的数据发布到Kafka主题,并调用传递相同数据的另一个端点。为了实现这种行为,我希望使用CompletableFuture并执行“两个步骤”:首先在MongoDb上保存数据,然后(然后是AcceptsSync)将数据发布到Kafka Topic,然后将AcceptsSync发布到另一个rest服务
问题:我完全无法完成第一步:将数据保存到MongoDb。我可以同时运行一个简单的系统。出来打印,但永远不要反应。save()保存数据。请参阅下面的代码
我很高兴,其余的都在工作,因为如果我尝试保存而不使用CompletableFuture,那么使用此方法一切都会很好:
//properly working without CompletableFuture
public void SimpleSaveMehtod(Extrato e) {
extratoRepository.save(e); // .subscribe();
}
下面是服务方法代码“runPrintAsync.get();”工作正常(打印消息),但“runSaveAsync.get();”一点也没存。我通过“ForkJoinPool.commonPool().awaitquisition(3,TimeUnit.SECONDS)”进行了一次尝试,但它也不起作用(老实说,如果我用get-btw阻止了它,那么等待是没有意义的,我尝试了它)
@Async("threadPoolTaskExecutor")
public void transferirVoidReturned(Extrato e) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
CompletableFuture<Void> runSaveAsync = CompletableFuture.runAsync(() -> extratoRepository.save(e),newFixedThreadPool);
CompletableFuture<Void> runPrintAsync = CompletableFuture.runAsync(() -> System.out.print("I am printed"), newFixedThreadPool);
try {
// ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS);
runSaveAsync.get();
runPrintAsync.get();
} catch (InterruptedException | ExecutionException e1) {
e1.printStackTrace();
}
}
如果相关,则存储库如下:
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}
# 1 楼答案
tl;dr:您需要订阅
Mono
:背景:在
Future
上使用反应类型背后的许多原理之一,类似的原理是它们允许惰性地执行计算。这意味着,当您创建一个Mono
、Flux
或任何其他结构并添加一些计算时,它将在需要时才会执行按照现在的编写方式,只要创建了
Mono
,线程就会返回到池中,因为没有更多的工作要做。因为没有订阅,所以Mono
中的计算永远不会执行