项目反应器(或RxJava2)的java执行器序列调用
我有以下任务,我想用ProjectReactor(或RxJava)解决它
这是事件的根源。每个事件由serviceId和一些有效负载组成。一旦收到事件,我们需要对指定的serviceId和负载执行操作。但我们应该确保对同一serviceId的两个请求之间的时间间隔必须大于或等于1秒。但对差异服务的请求可以并行执行
我们还应该注意,服务的计数是动态的
如下图所示
目前我有以下代码:
Flux.create((sink-> eventProvider.listen(new EventListner(){
public void event(req) {
sink.next(req);
}
})))
/* need some logic here */
.flatMap(req -> requestExecutor.execute(req))
.doOnNext(res -> responseProcessor.process(res))
.subscribe();
你有什么想法吗
# 1 楼答案
Flux.groupBy()将在这种情况下帮助您。 操作员使用映射器函数创建关键点,并根据关键点将发射的元素组合在一起。可以将Service ID视为一个键。
您还可以根据serviceId添加不同的延迟。以下面的代码片段为例——偶数整数将延迟2秒,奇数将延迟1秒
# 2 楼答案
如果事件标识了它们发起调用的服务,那么可以使用
groupBy()
操作符按服务分离流。要在每个服务请求之后引入延迟,请使用flatMap()
和一个参数来单线程使用在RxJava中:
serviceObservable
是一个GroupByObservable
将在下面处理李>serviceObservable.getKey()
返回要使用的服务的ID。我发明了一种方法service()
,它通过服务的ID向服务发送事件。此外,参数1
告诉flatMap()
单线程执行该操作,因此一次只能发生一个服务请求李>delay()
(或您想要的任何操作符)将等待一秒钟,然后释放操作李>(免责声明:此代码未经测试,但我在过去的项目中也做过类似的调度,所以基本思想是正确的。)