public class ReplayingSubscribingEventProcessor extends SubscribingEventProcessor {
private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
protected ReplayingSubscribingEventProcessor(
Builder builder) {
super(builder);
this.messageSource = builder.messageSource;
}
public static Builder builder() {
return new Builder();
}
/**
* Whenever there is a need to process event messages, ignore all of them and since already inside messageSource,
* just take all messages from event source and re-project all from beginning for this aggregate root
* @param eventMessages
*/
@Override
protected void process(List<? extends EventMessage<?>> eventMessages) {
try {
//reprocess all previous events for this aggregate (get id from current event)
GenericDomainEventMessage gdem = (GenericDomainEventMessage) eventMessages.get(0);
List<? extends EventMessage<?>> prevEvs = ((EventStore)messageSource).readEvents(gdem.getAggregateIdentifier()).asStream()
.collect(Collectors.toList());
processInUnitOfWork(prevEvs, new BatchingUnitOfWork<>(prevEvs), Segment.ROOT_SEGMENT);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new EventProcessingException("Exception occurred while processing events", e);
}
}
public static class Builder extends SubscribingEventProcessor.Builder{
private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
@Override
public Builder messageSource(
SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
super.messageSource(messageSource);
this.messageSource = messageSource;
return this;
}
@Override
public ReplayingSubscribingEventProcessor.Builder name(String name) {
super.name(name);
return this;
}
@Override
public ReplayingSubscribingEventProcessor.Builder eventHandlerInvoker(
EventHandlerInvoker eventHandlerInvoker) {
super.eventHandlerInvoker(eventHandlerInvoker);
return this;
}
@Override
public ReplayingSubscribingEventProcessor.Builder processingStrategy(
EventProcessingStrategy processingStrategy) {
super.processingStrategy(processingStrategy);
return this;
}
public ReplayingSubscribingEventProcessor build() {
return new ReplayingSubscribingEventProcessor(this);
}
}
}
# 1 楼答案
当然有可能! 但是,使用订阅事件处理器无法实现这一点。 您应该利用跟踪事件处理器,但它后面有一个
InMemoryTokenStore
。这样做,应用程序永远不能从它停止的地方开始,因为它停止的知识TrackingToken
不存在因此,每次启动时都会重新创建投影
您可以采取的另一种方法有点不同。 您仍然可以使用跟踪事件处理器,但要使用实际的持久
TokenStore
实现。其次,在应用程序启动时,可以使用TrackingEventProcessor#resetTokens()
函数发出给定跟踪事件处理器的重播采用这种方法,您可以在事件处理组件中添加带
@ResetHandler
注释的函数,以清除投影表之前的,从而再次处理所有事件希望这能给你一些见解博扬
# 2 楼答案
@Steven你觉得这个解决方案怎么样
和配置: