有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

集合中每个项的java异步调用

我有一个问题到目前为止还没有解决 我是RxKotlin的新手,所以这可能很简单。请看一下代码:

    override fun infos(): Stream<Info> =
        client.infoAboutItem(identifier)
                .map {
                    val itemId = it.itemId ?: ""
                    val item = client.itemForId(itemId)
                    ClientInfo(client, it, source, item) as Info
                }
                .let { AccessStream(it) }

Where stream是我们自制的收藏品。Map是一种方法,允许您迭代该集合中的每个项

问题是

 client.itemForId(itemId)

是一个http调用,返回一个不理想的

我想在map中创建一个异步调用,该调用将返回Item而不是Single,然后将其传递给ClientInfo。到目前为止,我尝试过在地图中使用订阅和blockingGet()方法,但这会阻止主线程,即使我在不同的线程上观察和订阅

因此,它涉及对集合中的每一个对象进行异步调用

谢谢你的帮助


共 (2) 个答案

  1. # 1 楼答案

    你应该把这些昂贵的操作包装成一个可观察的对象,并使用平面图将这些数据压缩成客户信息

    我写了一个小样本来展示它

    class SimpleTest {
      val testScheduler = TestScheduler()
    
      @Test
      fun test() {
        infos().observeOn(Schedulers.immediate())
            .subscribe { logger("Output", it.toString()) }
    
        testScheduler.advanceTimeBy(10, TimeUnit.MINUTES)
      }
    
      fun infos(): Single<List<ClientInfo>> {
        return Observable.from(infoAboutItem("some_identifier"))
            .doOnNext { logger("Next", it.toString()) }
            .flatMap { aboutItem ->
              Observable.fromCallable { itemForId(aboutItem.itemId) }
                  .subscribeOn(testScheduler)
                  .map { ClientInfo(aboutItem = aboutItem, item = it) }
            }
            .doOnNext { logger("Next", it.toString()) }
            .toList()
            .toSingle()
      }
    
      data class ClientInfo(
          val id: String = UUID.randomUUID().toString(),
          val aboutItem: AboutItem,
          val item: Item
      )
    
      data class AboutItem(val itemId: String = UUID.randomUUID().toString())
      data class Item(val id: String = UUID.randomUUID().toString())
    
      fun infoAboutItem(identifier: String): List<AboutItem> {
        return (1..10).map { AboutItem() }
      }
    
      fun itemForId(itemId: String): Item {
        val sleepTime = Random().nextInt(1000).toLong()
        Thread.sleep(sleepTime)
        return Item()
      }
    
      fun logger(tag: String, message: String): Unit {
        val formattedDate = Date(Schedulers.immediate().now()).format()
        System.out.println("$tag @ $formattedDate: $message")
      }
    
      fun Date.format(): String {
        return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
      }
    }
    
  2. # 2 楼答案

    您可以尝试返回Observable<Stream<Info>>,然后它看起来像:

       override fun infos(): Observable<Stream<Info>> = 
                    Observable.from(client.infoAboutItem(identifier))
                            .flatMapSingle {
                                val itemId = it.itemId ?: ""
                                client.itemForId(itemId)
                            }
                            .map { 
                                ClientInfo(client, it, source, item) as Info
                             }
                            .toList()
                            .flatMap {
                                AccessStream(it)
                            }