有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java单元测试带有外部依赖项的apache beam有状态管道

我有一个apache beam管道,可以读取pubsub,使用Redis丰富数据,最后写入pubsub。我正在尝试编写测试来测试浓缩Dofn,它是一个有状态的Dofn。在这里,内部状态充当近缓存,以减少对Redis的调用。为了实例化我的Redis客户端,我使用了PipelineOptions中声明的工厂,例如

@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();

void setRedisClient(RedisClient client);

理论上,上述客户机应该是每个员工的一个单独客户机。在我的单元测试中,我试图模仿redis客户端中的一些东西。我的测试是这样的-

//setup pipeline
TestStream<MetricsInstance> inputStream =
        TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream  = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));


CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);

当我试着运行这个测试时,我会遇到这样的错误

java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'

为了使框架不尝试序列化客户端,我可以在Options类的getRedisClient()上添加@JsonIgnore。但这会导致Redis实例在某个时候被重新创建,我所有的模拟和存根都会丢失。我想知道测试这种情况的最佳方法是什么


共 (1) 个答案

  1. # 1 楼答案

    在对Apache Beam邮件列表进行了一些讨论之后,我终于能够让这件事开始工作了。诀窍是设置RedisClientFactory时,使用管道选项中的另一个字段,该字段公开RedisClient类的名称

    所以选项看起来是这样的-

        @Default.Class(RedisClientImpl.class)
        Class<? extends RedisClient> getRedisClientClass();
    
        void setRedisClientClass(Class<? extends RedisClient> redisClientClass);
    
        @Default.InstanceFactory(RedisClientFactory.class)
        RedisClient getRedisClient();
    
        void setRedisClient(RedisClient client);
    

    工厂是这样实施的——

    public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
      @Override
      public RedisClient create(PipelineOptions options) {
    
        CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
        return InstanceBuilder.ofType(RedisClient.class)
            .fromClass(pipelineOptions.getRedisClientClass())
            .fromFactoryMethod("fromOptions")
            .withArg(PipelineOptions.class, options)
            .build();
      }
    
    }
    

    此工厂正在使用类RedisClientImpl中名为fromOptions的方法来构造客户机

      public static RedisClientImpl fromOptions(PipelineOptions options) {
        return new RedisClientImpl(options.as(CommonPipelineOptions.class));
      }
    

    使用此设置,我现在可以在单元测试中创建RedisClient的模拟实例

    options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
    options.setRedisClientClass(FakeRedisClient.class);
    ...
    // setup fake data in the FakeRedisClient by calling static methods
    FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
    ...
    pipeline.run(options);
    

    我们还需要确保FakeRedisClient类还公开了一个名为fromOptions的方法

      public static FakeRedisClient fromOptions(PipelineOptions options) {
        return new FakeRedisClient();
      }