java单元测试带有外部依赖项的apache beam有状态管道
我有一个apache beam管道,可以读取pubsub,使用Redis丰富数据,最后写入pubsub。我正在尝试编写测试来测试浓缩Dofn,它是一个有状态的Dofn。在这里,内部状态充当近缓存,以减少对Redis的调用。为了实例化我的Redis客户端,我使用了PipelineOptions中声明的工厂,例如
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
理论上,上述客户机应该是每个员工的一个单独客户机。在我的单元测试中,我试图模仿redis客户端中的一些东西。我的测试是这样的-
//setup pipeline
TestStream<MetricsInstance> inputStream =
TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));
CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);
当我试着运行这个测试时,我会遇到这样的错误
java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'
为了使框架不尝试序列化客户端,我可以在Options类的getRedisClient()
上添加@JsonIgnore
。但这会导致Redis实例在某个时候被重新创建,我所有的模拟和存根都会丢失。我想知道测试这种情况的最佳方法是什么
# 1 楼答案
在对Apache Beam邮件列表进行了一些讨论之后,我终于能够让这件事开始工作了。诀窍是设置RedisClientFactory时,使用管道选项中的另一个字段,该字段公开RedisClient类的名称
所以选项看起来是这样的-
工厂是这样实施的——
此工厂正在使用类
RedisClientImpl
中名为fromOptions
的方法来构造客户机使用此设置,我现在可以在单元测试中创建RedisClient的模拟实例
我们还需要确保FakeRedisClient类还公开了一个名为fromOptions的方法