有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用Hazelcast executorservice和StreamSerializer

我为我的一个特定Java类实现了Hazelcast的StreamSerializer(使用kryo)。在大多数情况下,这似乎都能正常工作,除非我想在IExecutorService上执行一个具有此类字段的Callable。然后我得到了一个例外:

com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: com.flir.its.flux.infrastructure.loadbalance.hazelcast.HazelcastLoadBalance
    at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:298)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:254)
    at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:262)
    at com.hazelcast.executor.BaseCallableTaskOperation.writeInternal(BaseCallableTaskOperation.java:94)
    at com.hazelcast.spi.Operation.writeData(Operation.java:284)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:129)
    at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:36)
    at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:48)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:176)
    at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:157)
    at com.hazelcast.spi.impl.NodeEngineImpl.toData(NodeEngineImpl.java:151)
    at com.hazelcast.spi.impl.BasicOperationService.send(BasicOperationService.java:650)
    at com.hazelcast.spi.impl.BasicOperationService.send(BasicOperationService.java:627)
    at com.hazelcast.spi.impl.BasicInvocation.doInvoke(BasicInvocation.java:356)
    at com.hazelcast.spi.impl.BasicInvocation.invoke(BasicInvocation.java:190)
    at com.hazelcast.spi.impl.BasicOperationService.invokeOnTarget(BasicOperationService.java:285)
    at com.hazelcast.executor.ExecutorServiceProxy.submitToMember(ExecutorServiceProxy.java:284)
    at com.hazelcast.executor.ExecutorServiceProxy.submitToMembers(ExecutorServiceProxy.java:302)
    at com.hazelcast.executor.ExecutorServiceProxy.submitToAllMembers(ExecutorServiceProxy.java:310)
    at com.company.app.infrastructure.loadbalance.hazelcast.HazelcastLoadBalancer.performLoadBalancing(HazelcastLoadBalancer.java:101)
    at com.company.app.infrastructure.loadbalance.hazelcast.HazelcastLoadBalancer.initialize(HazelcastLoadBalancer.java:62)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1681)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1620)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549)
... 13 more
Caused by: java.io.NotSerializableException: com.company.app.domain.loadbalance.LoadBalance

这个LoadBalance对象确实是不可序列化的,但是为什么我注册的StreamSerializer没有被使用呢?我不想让对象序列化

这是callable的外观:

@SpringAware
public class BalanceLoadCallable implements Callable<Void>, Serializable
{
// ------------------------------ FIELDS ------------------------------

    @Autowired
    private transient MessageSourceCommunicationService m_communicationService;

    @Autowired
    private transient ServerTopologyService m_serverTopologyService;

    private LoadBalance m_loadBalance;

// --------------------------- CONSTRUCTORS ---------------------------

    public BalanceLoadCallable( LoadBalance loadBalance )
    {
        m_loadBalance = loadBalance;
    }

// ------------------------ INTERFACE METHODS ------------------------

// --------------------- Interface Callable ---------------------

    @Override
    public Void call() throws Exception
    {
        m_communicationService.initializeCommunication( m_loadBalance.getSources( m_serverTopologyService.getCurrentServer() ) );
        return null;
    }
}

这是序列化程序:

package com.company.app.infrastructure.loadbalance.hazelcast;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.company.app.domain.loadbalance.LoadBalance;
import com.company.app.domain.messagesource.DetectorMessageSource;
import com.company.app.domain.messagesource.MessageSourceId;
import com.company.app.domain.server.Server;
import com.company.app.infrastructure.serialization.hazelcast.HazelcastSerializerId;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class LoadBalanceKryoSerializer implements StreamSerializer<LoadBalance>
{
// ------------------------------ FIELDS ------------------------------

    private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = new ThreadLocal<Kryo>()
    {
        @Override
        protected Kryo initialValue()
        {
            Kryo kryo = new Kryo();
            kryo.register( LoadBalance.class );
            kryo.register( Server.class, new Serializer<Server>()
            {
                @Override
                public void write( Kryo kryo, Output output, Server server )
                {
                    kryo.writeObject( output, server.getIpAddress() );
                    kryo.writeObject( output, server.getPort() );
                }

                @Override
                public Server read( Kryo kryo, Input input, Class type )
                {
                    String ipAddress = kryo.readObject( input, String.class );
                    int port = kryo.readObject( input, Integer.class );
                    return new Server( ipAddress, port );
                }
            } );
            kryo.register( DetectorMessageSource.class, new Serializer<DetectorMessageSource>()
            {
                @Override
                public void write( Kryo kryo, Output output, DetectorMessageSource source )
                {
                    kryo.writeObject( output, source.getId().getId() );
                    kryo.writeObject( output, source.getName() );
                }

                @Override
                public DetectorMessageSource read( Kryo kryo, Input input, Class type )
                {
                    return new DetectorMessageSource( new MessageSourceId( kryo.readObject( input, Long.class ) ),
                                                      kryo.readObject( input, String.class ) );
                }
            } );
            return kryo;
        }
    };

// ------------------------ INTERFACE METHODS ------------------------

// --------------------- Interface Serializer ---------------------

    @Override
    public int getTypeId()
    {
        return HazelcastSerializerId.LOAD_BALANCE.ordinal();
    }

    @Override
    public void destroy()
    {

    }

// --------------------- Interface StreamSerializer ---------------------

    @Override
    public void write( ObjectDataOutput objectDataOutput, LoadBalance loadBalance ) throws IOException
    {
        Kryo kryo = KRYO_THREAD_LOCAL.get();
        Output output = new Output( (OutputStream)objectDataOutput );
        HazelcastLoadBalance hazelcastLoadBalance = HazelcastLoadBalance.fromLoadBalance( loadBalance );
        kryo.writeObject( output, hazelcastLoadBalance );
        output.flush();
    }

    @Override
    public LoadBalance read( ObjectDataInput objectDataInput ) throws IOException
    {
        InputStream inputStream = (InputStream)objectDataInput;
        Input input = new Input( inputStream );
        HazelcastLoadBalance hazelcastLoadBalance = KRYO_THREAD_LOCAL.get().readObject( input, HazelcastLoadBalance.class );
        return hazelcastLoadBalance.toLoadBalance();
    }
}

共 (1) 个答案

  1. # 1 楼答案

    我不确定,但可能您必须将ServerTopologyService和MessageSourceCommunications Service标记为transient?可调用对象不应该有一个标准构造函数来使用Java序列化反序列化实例吗

    编辑: 您的方法的问题是,可调用项是使用Java序列化编写的,但您希望使用Hazelcast方式编写LoadBalance。这目前不起作用,因为当您第一次传递使用Java序列化编写的对象时,我们不再需要进行序列化。我有一个悬而未决的PR来拦截它,并将其处理回Hazelcast,但也只有当您的对象是一个可数据序列化的实现时,这在您的情况下也不是这样

    对于您来说,解决方案是将您的可调用对象设置为一个可数据序列化的实现类,然后使用DataObjectOutput::writeObject编写LoadBalance实例(并在另一端读取)

    因此,这不是ExecutorService实现的问题,而是可序列化包装器的问题,Hazelcast不再有机会应用StreamSerializer