有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java反应流:如何按键等待所有发布者?

假设我有3个出版商和1个处理器。发布者以{key: <integer>, value: <object>, publisher_id: <string>}的形式发布项目

出版商进行IO操作,因此:

  • 一方面,我希望出版商在给定的时间(大致)处理N
  • 另一方面,我希望消费者将项目合并到一个记录中(即^{

实际上,我已经实现了一个FluxProcessor,它有一个内部存储(ConcurrentHashMap)来保存所有项目。当容量未达到时,它会手动创建新项目

我想知道RxJava(2)/Spring Reactor API是否有内置的功能来实现这一点


共 (1) 个答案

  1. # 1 楼答案

    在RxJava 2中使用merge、rebatchRequests和toMultimap:

    Flowable<KeyValuePublisher> source1 = ...
    Flowable<KeyValuePublisher> source2 = ...
    Flowable<KeyValuePublisher> source3 = ...
    
    Flowable.merge(
        source1.rebatchRequests(N),
        source2.rebatchRequests(N),
        source3.rebatchRequests(N)
    )
    .toMultimap(kvp -> kvp.key, kvp -> kvp.value)
    subscribe(map -> System.out.println(map));