java使用Hazelcast executorservice和StreamSerializer
我为我的一个特定Java类实现了Hazelcast的StreamSerializer(使用kryo)。在大多数情况下,这似乎都能正常工作,除非我想在IExecutorService上执行一个具有此类字段的Callable。然后我得到了一个例外:
com.hazelcast.nio.serialization.HazelcastSerializationException: java.io.NotSerializableException: com.flir.its.flux.infrastructure.loadbalance.hazelcast.HazelcastLoadBalance
at com.hazelcast.nio.serialization.SerializationServiceImpl.handleException(SerializationServiceImpl.java:298)
at com.hazelcast.nio.serialization.SerializationServiceImpl.writeObject(SerializationServiceImpl.java:254)
at com.hazelcast.nio.serialization.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:262)
at com.hazelcast.executor.BaseCallableTaskOperation.writeInternal(BaseCallableTaskOperation.java:94)
at com.hazelcast.spi.Operation.writeData(Operation.java:284)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:129)
at com.hazelcast.nio.serialization.DataSerializer.write(DataSerializer.java:36)
at com.hazelcast.nio.serialization.StreamSerializerAdapter.write(StreamSerializerAdapter.java:48)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:176)
at com.hazelcast.nio.serialization.SerializationServiceImpl.toData(SerializationServiceImpl.java:157)
at com.hazelcast.spi.impl.NodeEngineImpl.toData(NodeEngineImpl.java:151)
at com.hazelcast.spi.impl.BasicOperationService.send(BasicOperationService.java:650)
at com.hazelcast.spi.impl.BasicOperationService.send(BasicOperationService.java:627)
at com.hazelcast.spi.impl.BasicInvocation.doInvoke(BasicInvocation.java:356)
at com.hazelcast.spi.impl.BasicInvocation.invoke(BasicInvocation.java:190)
at com.hazelcast.spi.impl.BasicOperationService.invokeOnTarget(BasicOperationService.java:285)
at com.hazelcast.executor.ExecutorServiceProxy.submitToMember(ExecutorServiceProxy.java:284)
at com.hazelcast.executor.ExecutorServiceProxy.submitToMembers(ExecutorServiceProxy.java:302)
at com.hazelcast.executor.ExecutorServiceProxy.submitToAllMembers(ExecutorServiceProxy.java:310)
at com.company.app.infrastructure.loadbalance.hazelcast.HazelcastLoadBalancer.performLoadBalancing(HazelcastLoadBalancer.java:101)
at com.company.app.infrastructure.loadbalance.hazelcast.HazelcastLoadBalancer.initialize(HazelcastLoadBalancer.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeCustomInitMethod(AbstractAutowireCapableBeanFactory.java:1681)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1620)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549)
... 13 more
Caused by: java.io.NotSerializableException: com.company.app.domain.loadbalance.LoadBalance
这个LoadBalance
对象确实是不可序列化的,但是为什么我注册的StreamSerializer没有被使用呢?我不想让对象序列化
这是callable的外观:
@SpringAware
public class BalanceLoadCallable implements Callable<Void>, Serializable
{
// ------------------------------ FIELDS ------------------------------
@Autowired
private transient MessageSourceCommunicationService m_communicationService;
@Autowired
private transient ServerTopologyService m_serverTopologyService;
private LoadBalance m_loadBalance;
// --------------------------- CONSTRUCTORS ---------------------------
public BalanceLoadCallable( LoadBalance loadBalance )
{
m_loadBalance = loadBalance;
}
// ------------------------ INTERFACE METHODS ------------------------
// --------------------- Interface Callable ---------------------
@Override
public Void call() throws Exception
{
m_communicationService.initializeCommunication( m_loadBalance.getSources( m_serverTopologyService.getCurrentServer() ) );
return null;
}
}
这是序列化程序:
package com.company.app.infrastructure.loadbalance.hazelcast;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.company.app.domain.loadbalance.LoadBalance;
import com.company.app.domain.messagesource.DetectorMessageSource;
import com.company.app.domain.messagesource.MessageSourceId;
import com.company.app.domain.server.Server;
import com.company.app.infrastructure.serialization.hazelcast.HazelcastSerializerId;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class LoadBalanceKryoSerializer implements StreamSerializer<LoadBalance>
{
// ------------------------------ FIELDS ------------------------------
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = new ThreadLocal<Kryo>()
{
@Override
protected Kryo initialValue()
{
Kryo kryo = new Kryo();
kryo.register( LoadBalance.class );
kryo.register( Server.class, new Serializer<Server>()
{
@Override
public void write( Kryo kryo, Output output, Server server )
{
kryo.writeObject( output, server.getIpAddress() );
kryo.writeObject( output, server.getPort() );
}
@Override
public Server read( Kryo kryo, Input input, Class type )
{
String ipAddress = kryo.readObject( input, String.class );
int port = kryo.readObject( input, Integer.class );
return new Server( ipAddress, port );
}
} );
kryo.register( DetectorMessageSource.class, new Serializer<DetectorMessageSource>()
{
@Override
public void write( Kryo kryo, Output output, DetectorMessageSource source )
{
kryo.writeObject( output, source.getId().getId() );
kryo.writeObject( output, source.getName() );
}
@Override
public DetectorMessageSource read( Kryo kryo, Input input, Class type )
{
return new DetectorMessageSource( new MessageSourceId( kryo.readObject( input, Long.class ) ),
kryo.readObject( input, String.class ) );
}
} );
return kryo;
}
};
// ------------------------ INTERFACE METHODS ------------------------
// --------------------- Interface Serializer ---------------------
@Override
public int getTypeId()
{
return HazelcastSerializerId.LOAD_BALANCE.ordinal();
}
@Override
public void destroy()
{
}
// --------------------- Interface StreamSerializer ---------------------
@Override
public void write( ObjectDataOutput objectDataOutput, LoadBalance loadBalance ) throws IOException
{
Kryo kryo = KRYO_THREAD_LOCAL.get();
Output output = new Output( (OutputStream)objectDataOutput );
HazelcastLoadBalance hazelcastLoadBalance = HazelcastLoadBalance.fromLoadBalance( loadBalance );
kryo.writeObject( output, hazelcastLoadBalance );
output.flush();
}
@Override
public LoadBalance read( ObjectDataInput objectDataInput ) throws IOException
{
InputStream inputStream = (InputStream)objectDataInput;
Input input = new Input( inputStream );
HazelcastLoadBalance hazelcastLoadBalance = KRYO_THREAD_LOCAL.get().readObject( input, HazelcastLoadBalance.class );
return hazelcastLoadBalance.toLoadBalance();
}
}
# 1 楼答案
我不确定,但可能您必须将ServerTopologyService和MessageSourceCommunications Service标记为transient?可调用对象不应该有一个标准构造函数来使用Java序列化反序列化实例吗
编辑: 您的方法的问题是,可调用项是使用Java序列化编写的,但您希望使用Hazelcast方式编写LoadBalance。这目前不起作用,因为当您第一次传递使用Java序列化编写的对象时,我们不再需要进行序列化。我有一个悬而未决的PR来拦截它,并将其处理回Hazelcast,但也只有当您的对象是一个可数据序列化的实现时,这在您的情况下也不是这样
对于您来说,解决方案是将您的可调用对象设置为一个可数据序列化的实现类,然后使用DataObjectOutput::writeObject编写LoadBalance实例(并在另一端读取)
因此,这不是ExecutorService实现的问题,而是可序列化包装器的问题,Hazelcast不再有机会应用StreamSerializer