有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java在RXJava中实现异步API调用序列

我有一个从本地数据库检索到的对象数组,需要按照API调用的顺序上传到服务器。 对于每个本地DB对象,我必须随后调用两个异步API调用(methodA(),methodB())。在完成整个循环之后,我需要调用另一个API调用,如下所示

for(Object object: localDBObjects){
    methodA() -> methodB()
}
methodC()

我的问题是如何阻止methodC()调用,直到完成循环


共 (2) 个答案

  1. # 1 楼答案

    这个怎么样:

    public class Main {
    
    public static CountDownLatch finishLatch = new CountDownLatch(1); 
    
    public static Integer methodA(Integer obj) {
        try {
            Thread.sleep((int) (Math.random() * 1000)); //Simulate asynchronous call
        } catch (InterruptedException e) {
        }
        System.out.println("methodA for " + obj + " executed by " + Thread.currentThread().getName());
        return obj;
    }
    
    public static Integer methodB(Integer obj) {
        try {
            Thread.sleep((int) (Math.random() * 1000)); //Simulate asynchronous call
        } catch (InterruptedException e) {
        }
        System.out.println("methodB for " + obj + " executed by " + Thread.currentThread().getName());
        return obj;
    }
    
    public static void methodC() {
        System.out.println("methodC executed by " + Thread.currentThread().getName());
        finishLatch.countDown(); //Allow main to finish
    }
    
    public static void main(String[] args) throws IOException, InterruptedException {
        List<Integer> objectsFromDb = Arrays.asList(1, 2, 3, 4, 5); //List of objects from the DB
    
        Observable.from(objectsFromDb) 
                .flatMap(obj -> Observable.fromCallable(() -> methodA(obj)).subscribeOn(Schedulers.io())) //Asynchronously call method A
                .flatMap(obj -> Observable.fromCallable(() -> methodB(obj)).subscribeOn(Schedulers.io())) //Asynchronously call method B
                .doOnCompleted(() -> methodC()) //When finished, call methodC
                .subscribe();
    
        finishLatch.await(); //Wait for everything to finish
    }
    

    }

    样本输出:

    methodA for 5 executed by RxCachedThreadScheduler-5
    methodA for 2 executed by RxCachedThreadScheduler-2
    methodA for 1 executed by RxCachedThreadScheduler-1
    methodB for 1 executed by RxCachedThreadScheduler-2
    methodB for 2 executed by RxCachedThreadScheduler-5
    methodB for 5 executed by RxCachedThreadScheduler-6
    methodA for 3 executed by RxCachedThreadScheduler-3
    methodA for 4 executed by RxCachedThreadScheduler-4
    methodB for 3 executed by RxCachedThreadScheduler-1
    methodB for 4 executed by RxCachedThreadScheduler-2
    methodC executed by RxCachedThreadScheduler-2
    
  2. # 2 楼答案

    由于我没有太多关于您的项目和方法的具体实现的信息,即它们的参数和返回类型,我有两个假设

    注意:我希望您不介意我使用lambda表达式

    1)。方法返回类似Observable<Object>Retrofit
    在本例中,它们看起来是这样的:

    public Observable<Object> methodA(Object o){
        return null;
    }
    
    public Observable<Object> methodB(Object o){
        return null;
    }
    
    public Observable<Object> methodC(Object[] objects){
        return null;
    }
    

    在这种情况下,您可以使用以下内容:

    Object[] localDBObjects = new Object[10];
    Observable.just(localDBObjects)
            .flatMap(objects -> Observable.from(objects)
                                    .flatMap(object -> methodA(object))
                                    .flatMap(resultFromMethodA -> methodB(resultFromMethodA))
                                    .toList())
            .flatMap(listOfResultsFromMethodB -> methodC(listOfResultsFromMethodB.toArray(new Object[listOfResultsFromMethodB.size()])))
            .subscribe(resultFromMethodC -> {
                //do something
            }, t -> t.printStackTrace());
    

    2)。在另一种情况下,方法返回Object,如下所示:

    public Object methodA(Object o){
        return null;
    }
    
    public Object methodB(Object o){
        return null;
    }
    
    public Object methodC(Object[] objects){
        return null;
    }
    

    在这种情况下,您需要在某些地方将运算符flatMap( )更改为map( )

        Object[] localDBObjects = new Object[10];
        Observable.just(localDBObjects)
                .flatMap(objects -> Observable.from(objects)
                                        .map(object -> methodA(object))
                                        .map(resultFromMethodA -> methodB(resultFromMethodA))
                                        .toList())
                .map(listOfResultsFromMethodB -> methodC(listOfResultsFromMethodB.toArray(new Object[listOfResultsFromMethodB.size()])))
                .subscribe(resultFromMethodC -> {
                    //do something
                }, t -> t.printStackTrace());