有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何对基于KinesRecord的DoFn进行单元测试?

我正在开始一个从AWS Kinesis读取的Beam项目,所以我有一个简单的DoFn,它接受KinesRecord并记录内容。我想写一个单元测试来运行这个DoFn,并证明它是有效的。不过,事实证明,使用KinesRecord进行单元测试具有挑战性

当我尝试只使用Create.of(testKinesisRecord)时,会出现这个错误:

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly  or a schema by invoking Create.withSchema().

正如错误所暗示的那样,我尝试过使用“withCoder”显式地提供KinesisRecordCoder,但它是一个私有类。也许还有另一种方法来单元测试DoFn

测试代码:

public class MyProjectTests {
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Test
    public void testPoC() {
        var testKinesisRecord = new KinesisRecord(
                ByteBuffer.wrap("SomeData".getBytes()),
                "seq01",
                12,
                "pKey",
                Instant.now().minus(Duration.standardHours(4)),
                Instant.now(),
                "MyStream",
                "shard-001"
        );


        PCollection<Void> output =
                p.apply(Create.of(testKinesisRecord))
                        .apply(ParDo.of(new MyProject.PrintRecordFn()));

        var result = p.run();
        result.waitUntilFinish();
        result.metrics().allMetrics().getCounters().forEach(longMetricResult -> {
            Assertions.assertEquals(1, longMetricResult.getCommitted().intValue());
        });
    }
}

DoFn代码:

  static class PrintRecordFn extends DoFn<KinesisRecord, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(PrintRecordFn.class);
    private final Counter items = Metrics.counter(PrintRecordFn.class, "itemsProcessed");

    @ProcessElement
    public void processElement(@Element KinesisRecord element) {
      items.inc();

      LOG.info("Stream: `{}` Shard: `{}` Arrived at `{}`\nData: {}",
              element.getStreamName(),
              element.getShardId(),
              element.getApproximateArrivalTimestamp(),
              element.getDataAsBytes());
    }
  }

共 (1) 个答案

  1. # 1 楼答案

    KinesisRecordCoder应该用于内部目的,因此它被设置为包私有。同时,您可以提供自定义AWSClientsProvider,并使用它生成测试数据。作为一个例子,请看一下KinesisMockReadTest和custom Provider