java跨JVM分发订阅者 1 年,2 月 Questions & Answers 444 据我所知,RXJava在单个JVM中工作。是否有一个包装器/lib/api来支持集群环境,并结合分布式缓存、JMS或任何其他队列来提供跨分布式环境的订阅者扩展?在改造车轮之前,我想在这里检查一下
# 1 楼答案 您可以在集群中部署Vertx实例,并在其上使用RxJava。其想法是使用EventBus作为传输层,并使用RxJava订阅消息。它不是一个纯粹的RxJava解决方案 一个非常简单的可运行示例: package com.example; import java.util.concurrent.TimeUnit; import io.reactivex.Flowable; import io.vertx.core.DeploymentOptions; import io.vertx.core.VertxOptions; import io.vertx.core.json.JsonObject; import io.vertx.core.spi.cluster.ClusterManager; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.eventbus.EventBus; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; public class MainVerticle extends AbstractVerticle { String nodeId; static final String CENTRAL = "CENTRAL"; @Override public void start() throws Exception { EventBus eventBus = vertx.eventBus(); JsonObject config = config(); String nodeID = config.getString("nodeID"); eventBus.consumer(CENTRAL).toFlowable() .map(msg -> (JsonObject) msg.body()) .filter(msgBody -> !msgBody.getString("sender", "").equals(nodeID)) .subscribe(msgBody -> { System.out.println(msgBody); }); Flowable.interval(1, TimeUnit.SECONDS) .subscribe(tick -> { JsonObject msg = new JsonObject() .put("sender", nodeID) .put("msg", "Hello world"); eventBus.publish(CENTRAL, msg); }); } public static void main(String[] args) { ClusterManager clusterManager = new HazelcastClusterManager(); VertxOptions options = new VertxOptions().setClusterManager(clusterManager); Vertx.rxClusteredVertx(options) .doOnError(throwable -> throwable.printStackTrace()) .subscribe(vertx -> { if (vertx.isClustered()) { System.out.println("Vertx is running clustered"); } String nodeID = clusterManager.getNodeID(); System.out.println("Node ID : " + nodeID); String mainVerticle = MainVerticle.class.getCanonicalName(); DeploymentOptions deploymentOptions = new DeploymentOptions(); deploymentOptions.setConfig(new JsonObject().put("nodeID", nodeID)); vertx.rxDeployVerticle(mainVerticle, deploymentOptions).subscribe(); }); } } Maven依赖项: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rxjava2-clustered</artifactId> <version>0.42</version> <packaging>jar</packaging> <name>rxjava2-clustered</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.5.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-rx-java2</artifactId> <version>3.5.0</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-hazelcast</artifactId> <version>3.5.0</version> </dependency> </dependencies> </project> 在本例中,我使用的是Hazelcast ClusterManager。Infinispan、ApacheIgnite和ApacheZooKeeper都有实现。请参阅documentation以获取完整参考:
# 1 楼答案
您可以在集群中部署Vertx实例,并在其上使用RxJava。其想法是使用EventBus作为传输层,并使用RxJava订阅消息。它不是一个纯粹的RxJava解决方案
一个非常简单的可运行示例:
Maven依赖项:
在本例中,我使用的是Hazelcast ClusterManager。Infinispan、ApacheIgnite和ApacheZooKeeper都有实现。请参阅documentation以获取完整参考: