有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Netty 4池返回一个尚未准备好发送实际消息的通道

我已经创建了一个SimpleChannelInboundHandler类型的入站处理程序,并将其添加到管道中。我的意图是,每次建立连接时,我都希望发送一条名为sessionopenmessage的应用程序消息,并使连接准备好发送实际消息。为了实现这一点,上面的入站处理程序 over rides channelActive()发送会话打开消息,作为响应,我将收到会话打开确认消息。只有在那之后,我才能发送任何数量的实际业务信息。我正在使用FixedChannelPool,并初始化如下。这在启动时的某段时间效果很好。但是,如果远程主机关闭连接,之后如果发送一条消息调用下面的sendMessage(),则该消息甚至会在通过channelActive()打开会话消息并获得其响应之前发送。因此,在发送业务消息时,服务器会忽略该消息,因为会话尚未打开

我要寻找的是,池应该只返回那些调用了channelActive()事件的通道,这些通道已经发送了会话打开消息,并且已经从服务器获得了会话打开确认消息。如何处理这种情况

public class SessionHandler extends SimpleChannelInboundHandler<byte[]> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        if (ctx.channel().isWritable()) {
            ctx.channel().writeAndFlush("open session message".getBytes()).;
        }
    }
}

// At the time of loading the applicaiton
public void init() {
    final Bootstrap bootStrap = new Bootstrap();
    bootStrap.group(group).channel(NioSocketChannel.class).remoteAddress(hostname, port);
    fixedPool = new FixedChannelPool(bootStrap, getChannelHandler(), 5);

    // This is done to intialise connection and the channelActive() from above handler is invoked to keep the session open on startup
    for (int i = 0; i < config.getMaxConnections(); i++) {
        fixedPool.acquire().addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {

                } else {
                    LOGGER.error(" Channel initialzation failed...>>", future.cause());
                }

            }
        });
    }
}

//要实际发送消息,应用程序将调用以下方法

public void sendMessage(final String businessMessage) {
    fixedPool.acquire().addListener(new FutureListener<Channel>() {
        @Override
        public void operationComplete(Future<Channel> future) throws Exception {
            if (future.isSuccess()) {
                Channel channel = future.get();
                if (channel.isOpen() && channel.isActive() && channel.isWritable()) {
                    channel.writeAndFlush(businessMessage).addListener(new GenericFutureListener<ChannelFuture>() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess()) {
                                // success msg
                            } else {
                                // failure msg
                            }
                        }
                    });
                    fixedPool.release(channel);
                }
            } else {
                // Failure
            }
        }
    });
}

共 (1) 个答案

  1. # 1 楼答案

    如果没有具体原因需要使用FixedChannelPool,那么可以使用另一个数据结构(List/Map)来存储通道。您可以在发送open session消息后向数据结构添加通道,并在channelInactive方法中删除它

    如果需要在通道上执行批量操作,可以使用ChannelGroup来实现

    如果您仍然想使用FixedChannelPool,您可以在通道中设置一个属性,说明是否发送了打开的消息:

    ctx.channel().attr(OPEN_MESSAGE_SENT).set(true);
    

    您可以在sendMessage函数中获得如下属性:

    boolean sent = ctx.channel().attr(OPEN_MESSAGE_SENT).get();
    

    channelInactive中,您可以将其设置为false或将其删除

    OPEN_MESSAGE_SENT是一个AttributeKey

    public static final AttributeKey<Boolean> OPEN_MESSAGE_SENT = AttributeKey.valueOf("OPEN_MESSAGE_SENT");