有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Axon总是重新投射所有事件

是否可以使用usingSubscribingEventProcessors并且在投影事件时,始终从一开始就重新投影所有事件。意思是——我从不将投影保存到DB,但每当聚合中发出新事件时,就重新投影所有事件


共 (2) 个答案

  1. # 1 楼答案

    当然有可能! 但是,使用订阅事件处理器无法实现这一点。 您应该利用跟踪事件处理器,但它后面有一个InMemoryTokenStore。这样做,应用程序永远不能从它停止的地方开始,因为它停止的知识TrackingToken不存在

    因此,每次启动时都会重新创建投影

    您可以采取的另一种方法有点不同。 您仍然可以使用跟踪事件处理器,但要使用实际的持久TokenStore实现。其次,在应用程序启动时,可以使用TrackingEventProcessor#resetTokens()函数发出给定跟踪事件处理器的重播

    采用这种方法,您可以在事件处理组件中添加带@ResetHandler注释的函数,以清除投影表之前的,从而再次处理所有事件

    希望这能给你一些见解博扬

  2. # 2 楼答案

    @Steven你觉得这个解决方案怎么样

    public class ReplayingSubscribingEventProcessor extends SubscribingEventProcessor {
    
      private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    
      protected ReplayingSubscribingEventProcessor(
          Builder builder) {
        super(builder);
        this.messageSource = builder.messageSource;
      }
    
      public static Builder builder() {
        return new Builder();
      }
    
      /**
       * Whenever there is a need to process event messages, ignore all of them and since already inside messageSource,
       * just take all messages from event source and re-project all from beginning for this aggregate root
       * @param eventMessages
       */
      @Override
      protected void process(List<? extends EventMessage<?>> eventMessages) {
        try {
          //reprocess all previous events for this aggregate (get id from current event)
          GenericDomainEventMessage gdem = (GenericDomainEventMessage) eventMessages.get(0);
          List<? extends EventMessage<?>> prevEvs = ((EventStore)messageSource).readEvents(gdem.getAggregateIdentifier()).asStream()
              .collect(Collectors.toList());
          processInUnitOfWork(prevEvs, new BatchingUnitOfWork<>(prevEvs), Segment.ROOT_SEGMENT);
        } catch (RuntimeException e) {
          throw e;
        } catch (Exception e) {
          throw new EventProcessingException("Exception occurred while processing events", e);
        }
      }
    
      public static class Builder extends SubscribingEventProcessor.Builder{
        private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    
        @Override
        public Builder messageSource(
            SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
          super.messageSource(messageSource);
          this.messageSource = messageSource;
          return this;
        }
    
        @Override
        public ReplayingSubscribingEventProcessor.Builder name(String name) {
          super.name(name);
          return this;
        }
    
        @Override
        public ReplayingSubscribingEventProcessor.Builder eventHandlerInvoker(
            EventHandlerInvoker eventHandlerInvoker) {
          super.eventHandlerInvoker(eventHandlerInvoker);
          return this;
        }
    
        @Override
        public ReplayingSubscribingEventProcessor.Builder processingStrategy(
            EventProcessingStrategy processingStrategy) {
          super.processingStrategy(processingStrategy);
          return this;
        }
    
        public ReplayingSubscribingEventProcessor build() {
          return new ReplayingSubscribingEventProcessor(this);
        }
      }
    }
    

    和配置:

    @Autowired
        public void configure(EventProcessingConfigurer configurer){
            configurer.registerEventProcessor("inMemoryProcessor",
                    (n, c, ehi) -> replayingSubscribingEventProcessor(n, c, ehi, org.axonframework.config.Configuration::eventBus));
        }
    
        public ReplayingSubscribingEventProcessor replayingSubscribingEventProcessor(
                String name,
                org.axonframework.config.Configuration conf,
                EventHandlerInvoker eventHandlerInvoker,
                Function<org.axonframework.config.Configuration, SubscribableMessageSource<? extends EventMessage<?>>> messageSource) {
            return ReplayingSubscribingEventProcessor.builder()
                    .name(name)
                    .eventHandlerInvoker(eventHandlerInvoker)
                    .messageSource(messageSource.apply(conf))
                    .processingStrategy(DirectEventProcessingStrategy.INSTANCE)
                    .build();
        }