java如何对基于KinesRecord的DoFn进行单元测试?
我正在开始一个从AWS Kinesis读取的Beam项目,所以我有一个简单的DoFn,它接受KinesRecord并记录内容。我想写一个单元测试来运行这个DoFn,并证明它是有效的。不过,事实证明,使用KinesRecord进行单元测试具有挑战性
当我尝试只使用Create.of(testKinesisRecord)
时,会出现这个错误:
java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema().
正如错误所暗示的那样,我尝试过使用“withCoder”显式地提供KinesisRecordCoder,但它是一个私有类。也许还有另一种方法来单元测试DoFn
测试代码:
public class MyProjectTests {
@Rule
public TestPipeline p = TestPipeline.create();
@Test
public void testPoC() {
var testKinesisRecord = new KinesisRecord(
ByteBuffer.wrap("SomeData".getBytes()),
"seq01",
12,
"pKey",
Instant.now().minus(Duration.standardHours(4)),
Instant.now(),
"MyStream",
"shard-001"
);
PCollection<Void> output =
p.apply(Create.of(testKinesisRecord))
.apply(ParDo.of(new MyProject.PrintRecordFn()));
var result = p.run();
result.waitUntilFinish();
result.metrics().allMetrics().getCounters().forEach(longMetricResult -> {
Assertions.assertEquals(1, longMetricResult.getCommitted().intValue());
});
}
}
DoFn代码:
static class PrintRecordFn extends DoFn<KinesisRecord, Void> {
private static final Logger LOG = LoggerFactory.getLogger(PrintRecordFn.class);
private final Counter items = Metrics.counter(PrintRecordFn.class, "itemsProcessed");
@ProcessElement
public void processElement(@Element KinesisRecord element) {
items.inc();
LOG.info("Stream: `{}` Shard: `{}` Arrived at `{}`\nData: {}",
element.getStreamName(),
element.getShardId(),
element.getApproximateArrivalTimestamp(),
element.getDataAsBytes());
}
}
# 1 楼答案
KinesisRecordCoder
应该用于内部目的,因此它被设置为包私有。同时,您可以提供自定义AWSClientsProvider
,并使用它生成测试数据。作为一个例子,请看一下KinesisMockReadTest和custom Provider