有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在projectreactor中设置阻塞异步请求/响应?

我正在与ANT+U盘进行接口,并用ProjectReactor替换我自己天真的“MessageBus”,因为它看起来非常合适
usb接口本质上是异步的(独立的输入/输出管道),我希望以阻塞的方式处理一组请求/响应消息

我已经设置了一个单独的线程,它可以连续地从usb in管道读取消息,并将它们写入一个接收器,该接收器提供一个共享流量,任何人都可以订阅。这似乎很有效

目前我向usb管道发送消息,然后使用。filter()和。共享流量上的blockFirst():(人为代码)

    /**
     * Puts a message on the Usb  out Pipe and waits for the relevant asynchronous response on the {@link AntUsbReader#antMessages()} {@link Flux}
     *
     * @param message Message to send.
     * @return related response message.
     */

    public AntMessage sendBlocking(AntBlockingMessage message) {
        send(message); // in essence, calls usbOutPipe.syncSubmit(message.getBytes()), returns void
        // bug: ant dongle can reply to message even before following Flux is activated, meaning .blockFirst() goes in timeout.
        return this.antUsbMessageReader.antMessages() // .antMessages() is an (infinite)  Flux<AntMessage>
                .filter(antMessage -> antMessage.getMessageId() == message.getMessageId())
                .blockFirst(Duration.ofSeconds(10));
    }

问题是usb记忆棒甚至在通量激活之前就可以响应,从而导致超时异常
Thread.sleep(10)添加到usb读卡器“解决”了这个问题,但实现这种阻塞行为的正确方法是什么

  • 设置订阅(使用.take(1)),发送消息,然后阻止订阅
  • 设置发送和等待正确响应的流量

我想不出这个


共 (2) 个答案

  1. # 1 楼答案

    看了你的代码后,我会大致提出一些建议。我已经用手机写过了,所以还没有测试过

    但我们先写,然后阻塞1秒,然后返回过滤响应的获取

    Flux<AntMessage> response = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage))
        .block(Duration.ofSeconds(1))
        .thenReturn(this.antUsbReader.antMessages()
                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
                .take(1));
    
  2. # 2 楼答案

    我找到了一个有效的解决方案,但我不确定这是否是最好的:

    我设置了一个用于发送异步消息的Mono,并将其与过滤匹配消息的流量合并。 看到Mono从不发出值,我知道合并的第一个对象是来自Flux的响应消息,所以我可以将其转换为正确的类型

    这仍然让人感觉有点脏,但再一次,试图用一个用于异步工作的框架获得阻塞行为总是会让人感觉有点脏

        public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
            Flux<AntMessage> response = this.antUsbReader.antMessages()
                    .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))
                    .take(1);
    
            Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
            return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));
        }
    
        private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
            if (message instanceof RequestMessage) {
                return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
            }
            return response.getMessageId() == message.getMessageId();
        }