有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Flink cassandraOutputFormat元组需要冻结值

我有一个flink项目,它将作为批处理作业在cassandra表中插入数据。我已经有了一个flink stream项目,它正在将pojo写入同一个cassandra表,但是cassandraOutputFormat需要数据作为元组(希望在某个时候可以像CassandraSink那样接受pojo)。这就是我的pojo:

@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {

    @Column(name = "jobId")
    private String jobId;
    @Column(name = "datalist")
    @Frozen("list<frozen<dataobj>")
    private List<CustomDataObj> dataobjs;
    @Column(name = "userid")
    private String userid;

    //Getters and Setters
}

我从这个pojo生成的元组数据集:

DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = listOfAlphaGroupingObject.map(new AlphaGroupingObjectToTuple3Mapper());

以下是触发输出的行:

outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt));

现在我遇到的问题是,当我尝试运行它时,当它尝试将其输出到cassandra表时,我会出现以下错误:

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: 
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <-> flink.custom.data.CustomDataObj]

所以我知道当它是pojo时,我只需要在字段中添加@freezed注释,但我不知道如何为元组添加注释。解决此问题的最佳/正确方法是什么?或者我做了一些不必要的事情,因为实际上有一种方法可以通过cassandraOutputFormat发送POJO,但我还没有找到

提前感谢您的帮助

编辑:

下面是CustomDataObj类的代码:

@UDT(name="dataobj", keyspace = "mykeyspace")
public class CustomDataObj implements Serializable {


    @Field(name = "userid")
    private String userId;

    @Field(name = "groupid")
    private String groupId;

    @Field(name = "valuetext")
    private String valueText;

    @Field(name = "comments")
    private String comments;

    //Getters and setters
}

编辑2

包括CustomDataObj绑定到的cassandra中的表模式和mytablename模式

CREATE TYPE mykeyspace.dataobj (
    userid text,
    groupid text,
    valuetext text,
    comments text
);

CREATE TABLE mykeyspace.mytablename (
    jobid text,
    datalist list<frozen<dataobj>>,
    userid text,
    PRIMARY KEY (jobid, userid)
);

共 (2) 个答案

  1. # 1 楼答案

    CustomDataObj类上添加UDT注释

    @UDT(name = "dataobj")
    public class CustomDataObj { 
        //...... 
    }
    

    已编辑

    jobid注释更改为@Column(name = "jobid"),将dataobjs冻结注释更改为@Frozen

    @Table(keyspace="mykeyspace", name="mytablename")
    public class AlphaGroupingObject implements Serializable {
    
        @Column(name = "jobid")
        private String jobId;
    
        @Column(name = "datalist")
        @Frozen
        private List<CustomDataObj> dataobjs;
        @Column(name = "userid")
        private String userid;
    
        //Getters and Setters
    }
    
  2. # 2 楼答案

    我相信我已经找到了一种比向cassandraOutputFormat提供一个元组更好的方法,但从技术上讲,它仍然不能回答这个问题,所以我不会将此标记为答案。最后我使用了cassandra的对象映射器,这样我就可以将pojo发送到表中。仍然需要验证数据是否成功地到达表中,以及是否一切都按照实现方式正常工作,但我觉得这将帮助任何面临类似问题的人

    下面是概述解决方案的文档:http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/using/