有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java跨JVM分发订阅者

据我所知,RXJava在单个JVM中工作。是否有一个包装器/lib/api来支持集群环境,并结合分布式缓存、JMS或任何其他队列来提供跨分布式环境的订阅者扩展?在改造车轮之前,我想在这里检查一下


共 (1) 个答案

  1. # 1 楼答案

    您可以在集群中部署Vertx实例,并在其上使用RxJava。其想法是使用EventBus作为传输层,并使用RxJava订阅消息。它不是一个纯粹的RxJava解决方案

    一个非常简单的可运行示例:

    package com.example;
    
    import java.util.concurrent.TimeUnit;
    
    import io.reactivex.Flowable;
    import io.vertx.core.DeploymentOptions;
    import io.vertx.core.VertxOptions;
    import io.vertx.core.json.JsonObject;
    import io.vertx.core.spi.cluster.ClusterManager;
    import io.vertx.reactivex.core.AbstractVerticle;
    import io.vertx.reactivex.core.Vertx;
    import io.vertx.reactivex.core.eventbus.EventBus;
    import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
    
    public class MainVerticle extends AbstractVerticle {
    
        String nodeId;
        static final String CENTRAL = "CENTRAL";
    
        @Override
        public void start() throws Exception {
    
            EventBus eventBus = vertx.eventBus();
    
            JsonObject config = config();
            String nodeID = config.getString("nodeID");
    
            eventBus.consumer(CENTRAL).toFlowable()
                .map(msg -> (JsonObject) msg.body())
                .filter(msgBody -> !msgBody.getString("sender", "").equals(nodeID))
                .subscribe(msgBody -> {
                    System.out.println(msgBody);
                });
    
            Flowable.interval(1, TimeUnit.SECONDS)
                .subscribe(tick -> {
                    JsonObject msg = new JsonObject()
                            .put("sender", nodeID)
                            .put("msg", "Hello world");
                    eventBus.publish(CENTRAL, msg);
                });
        }
    
        public static void main(String[] args) {
            ClusterManager clusterManager = new HazelcastClusterManager();
    
            VertxOptions options = new VertxOptions().setClusterManager(clusterManager);
    
            Vertx.rxClusteredVertx(options)
                .doOnError(throwable -> throwable.printStackTrace())
                .subscribe(vertx -> {
                    if (vertx.isClustered()) {
                        System.out.println("Vertx is running clustered");
                    }
                    String nodeID = clusterManager.getNodeID();
                    System.out.println("Node ID : " + nodeID);
    
                    String mainVerticle = MainVerticle.class.getCanonicalName();
    
                    DeploymentOptions deploymentOptions = new DeploymentOptions();
                        deploymentOptions.setConfig(new JsonObject().put("nodeID", nodeID));
    
                    vertx.rxDeployVerticle(mainVerticle, deploymentOptions).subscribe();
                });
    
        }
    
    }
    

    Maven依赖项:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>rxjava2-clustered</artifactId>
        <version>0.42</version>
        <packaging>jar</packaging>
    
        <name>rxjava2-clustered</name>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-core</artifactId>
                <version>3.5.0</version>
            </dependency>
    
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-rx-java2</artifactId>
                <version>3.5.0</version>
            </dependency>
    
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-hazelcast</artifactId>
                <version>3.5.0</version>
            </dependency>
    
        </dependencies>
    </project>
    

    在本例中,我使用的是Hazelcast ClusterManager。Infinispan、ApacheIgnite和ApacheZooKeeper都有实现。请参阅documentation以获取完整参考: