有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在Apache Beam/Google数据流中使用ParseJsons?

这里是java新手。我正在努力理解如何在Apache Beam管道中使用ParseJsons将字符串PCollection解析为对象PCollection

我的理解是,我需要首先定义一个与json结构匹配的类,然后使用ParseJSON将json字符串映射到该类的对象中

然而,ParseJsons文档在我看来很神秘。我不确定如何使用ApacheBeam实际执行转换。有人能给我一个快速而肮脏的例子,说明如何解析以行分隔的json字符串吗

下面是我所做的尝试之一,但不幸的是语法不正确

class Product {
  private String name = null;
  private String url = null;
}

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
 .apply(new ParseJsons.of(Product))
 .apply("WriteCounts", TextIO.write().to(options.getOutput()));

共 (2) 个答案

  1. # 1 楼答案

    ParseJsons.of方法是静态的。所以你可以直接调用它,而不用实例化这个类。但是,您需要将结果转换回字符串。例如:

    PCollection<MyPojo> = 
       p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply("Parse JSON", ParseJsons.of(MyPojo.class))
        .apply("Convert back to String", ParDo.of(new FormatPojoFn()))
        .apply("WriteCounts", TextIO.write().to(options.getOutput()));
    

    您还可以尝试在TextIO class上使用writeCustomType方法:

    p.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent()).to(...)
    
  2. # 2 楼答案

    我想你想要:

    PCollectoion<Product> = 
      p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
       .apply(new ParseJsons.of(Product.class))
       .setCoder(SerializableCoder.of(MyPojo.class));