有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java siddhi无法使用siddhi从rabbitmq检索事件消息

public static void main(String[] args) {
    String siddhiApp = "@App:name('TestExecutionPlan') " 
            + "define stream FooStream (teste string);  "
            + "@info(name = 'query1')  " 
            + "@source(type ='rabbitmq', "
            + "uri = 'amqp://test:test@192.168.99.100:5672', " 
            + "exchange.name = 'amq.topic', "
            + "exchange.type = 'topic', "
            + "routing.key= '#', "
            + "queue.name = 'siddhi-queue', "
            + "@map(type='text')) " 
            + "Define stream BarStream (test string); "
            + "from FooStream select test insert into BarStream; ";

    SiddhiManager siddhiManager = new SiddhiManager();

    SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

    siddhiAppRuntime.start();

    siddhiAppRuntime.addCallback("FooStream", new StreamCallback() {
        public void receive(Event[] event) {
            EventPrinter.print(event);
        }
    });

}

此代码无法从rabbitmq检索事件消息

我可以看到连接和进入rabbitmq仪表板的通道,所有发布到exchange的消息都会传递到其他绑定队列


共 (2) 个答案

  1. # 1 楼答案

    抱歉,伙计们,我发现问题了

    log4j。未配置属性

    log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%m%n

  2. # 2 楼答案

    你需要回调“BarStream”,这是源代码。您可以尝试以下示例

        @Test
    public void rabbitmqSourceTest() throws InterruptedException {
        AtomicInteger eventCount = new AtomicInteger(0);
        String siddhiApp = "@App:name('TestExecutionPlan') "
                + "define stream FooStream (test string); "
                + "@info(name = 'query1')  "
                + "@source(type ='rabbitmq', "
                + "uri = 'amqp://guest:guest@172.17.0.2:5672', "
                + "exchange.name = 'amq.topic', "
                + "exchange.type = 'topic', "
                + "routing.key= '#',"
                + "queue.name = 'siddhi-queue', "
                + "@map(type='text')) "
                + "define stream BarStream (test string); ";
        SiddhiManager siddhiManager = new SiddhiManager();
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
        siddhiAppRuntime.addCallback("BarStream", new StreamCallback() {
            @Override
            public void receive(Event[] events) {
                for (Event event : events) {
                    EventPrinter.print(event);
                    eventCount.incrementAndGet();
                }
            }
        });
        siddhiAppRuntime.start();
        SiddhiAppRuntime executionPlanRuntime = siddhiManager.createSiddhiAppRuntime(
                "@App:name('TestExecutionPlan') " +
                        "define stream FooStream (test string); " +
                        "@info(name = 'query1') " +
                        "@sink(type ='rabbitmq', uri = 'amqp://guest:guest@172.17.0.2:5672', " +
                        "exchange.type='topic', " +
                        "exchange.name = 'amq.topic', " +
                        "@map(type='text'))" +
                        "Define stream BarStream (test string);" +
                        "from FooStream select test insert into BarStream;");
        InputHandler fooStream = executionPlanRuntime.getInputHandler("FooStream");
        executionPlanRuntime.start();
        List<Event> arrayList = new ArrayList<Event>();
        arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
        arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"IBM"}));
        arrayList.add(new Event(System.currentTimeMillis(), new Object[]{"WSO2"}));
        fooStream.send(arrayList.toArray(new Event[3]));
        SiddhiTestHelper.waitForEvents(waitTime, 3, eventCount, timeout);
        executionPlanRuntime.shutdown();
        siddhiAppRuntime.shutdown();
    }
    

    参考Siddhi查询Quide:Siddhi-source