java如何使用连接处理sparkstructured流中的备份场景?
我使用的是spark sql 2.4.1,spark-cassandra-connector_2.11-2.4.1。jar和java8。我有一个场景,需要将流数据与C*/Cassandra表数据连接起来
我有两张表,如下“主表””&;“备份\u表”
table kspace.master_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );
table kspace.backup_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
backup_timestamp timestamp,
PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC, backup_timestamp DESC);
每个流媒体记录都会有“statement_flag”,可能是“I”或“U”
- 如果出现带有“I”的记录,我们将直接插入“主表”李>
- 如果出现带有“U”的记录,我们需要检查“主表”中是否有给定(语句id)、语句日期的记录李>
- 如果“主表”中有记录,则将该记录复制到带有当前时间戳(即备份时间戳)的“备份表”李>
- 用最新记录更新“主表”中的记录李>
下面的代码应该可以做到这一点,但事实并非如此
Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));
String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";
Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);
writeDfToCassandra(baseDs.toDF(), keyspace, master_table);
u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");
Dataset<Row> joinUpdatedRecordsDs = sparkSession.sql(
" select p.statement_id, p.statement_flag, p.statement_date,"
+ "p.x_val,p.y_val,p.z_val "
+ " from persisted_records as p "
+ "join u_records as u "
+ "on p.statement_id = u.statement_id and p.statement_date = u.statement_date");
Dataset<Row> updated_records = joinUpdatedRecordsDs
.withColumn("backup_timestamp", current_timestamp());
writeDfToCassandra(updated_records.toDF(), keyspace, backup_table);
“主表”中的所有内容都复制到备份表中。事实上,它假设只复制以前版本的记录,而不是最新版本的记录
假设只从master_表复制以前版本的记录,但所有记录都已复制到backup_表
如何仅使以前版本的记录从master_table
复制到backup_table
而不是最新版本
样本数据
对于带有“I”标志的第一条记录
主表
备份表
对于带有“U”标志的第二条记录,即除“y_val”列数据外,与前面的记录相同
主表
备份表
期望
但实际的表数据是
共 (0) 个答案