有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaSpringFramework。数据存储库。反应性。反应性积存。save()在从CompletableFuture调用时不持久化数据。运行异步

上下文:我想将数据异步保存到MongoDb。当Front(例如Angular/Mobile)调用控制器端点时,它将调用一个服务方法,目的是首先将数据保存到MongoDb,如果成功,将相同的数据发布到Kafka主题,并调用传递相同数据的另一个端点。为了实现这种行为,我希望使用CompletableFuture并执行“两个步骤”:首先在MongoDb上保存数据,然后(然后是AcceptsSync)将数据发布到Kafka Topic,然后将AcceptsSync发布到另一个rest服务

问题:我完全无法完成第一步:将数据保存到MongoDb。我可以同时运行一个简单的系统。出来打印,但永远不要反应。save()保存数据。请参阅下面的代码

我很高兴,其余的都在工作,因为如果我尝试保存而不使用CompletableFuture,那么使用此方法一切都会很好:

//properly working without CompletableFuture
public void SimpleSaveMehtod(Extrato e) {
    extratoRepository.save(e); // .subscribe();
}

下面是服务方法代码“runPrintAsync.get();”工作正常(打印消息),但“runSaveAsync.get();”一点也没存。我通过“ForkJoinPool.commonPool().awaitquisition(3,TimeUnit.SECONDS)”进行了一次尝试,但它也不起作用(老实说,如果我用get-btw阻止了它,那么等待是没有意义的,我尝试了它)

@Async("threadPoolTaskExecutor")
public void transferirVoidReturned(Extrato e) {

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

    CompletableFuture<Void> runSaveAsync = CompletableFuture.runAsync(() -> extratoRepository.save(e),newFixedThreadPool);

    CompletableFuture<Void> runPrintAsync = CompletableFuture.runAsync(() -> System.out.print("I am printed"), newFixedThreadPool);

    try {
        // ForkJoinPool.commonPool().awaitQuiescence(3, TimeUnit.SECONDS);

        runSaveAsync.get();
        runPrintAsync.get();

    } catch (InterruptedException | ExecutionException e1) {
        e1.printStackTrace();
    }
}

如果相关,则存储库如下:

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;

public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
    @Query("{ id: { $exists: true }}")
    Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}

共 (1) 个答案

  1. # 1 楼答案

    tl;dr:您需要订阅Mono

    public void SimpleSaveMehtod(Extrato e) {
        extratoRepository.save(e).subscribe();
    }
    

    背景:在Future上使用反应类型背后的许多原理之一,类似的原理是它们允许惰性地执行计算。这意味着,当您创建一个MonoFlux或任何其他结构并添加一些计算时,它将在需要时才会执行

    按照现在的编写方式,只要创建了Mono,线程就会返回到池中,因为没有更多的工作要做。因为没有订阅,所以Mono中的计算永远不会执行