ApacheBeamJava:MongoDB过滤器
我对dataflow/Apchebeam不熟悉。我正在从MongoDB获取数据。MongoDB连接工作正常,但我无法应用过滤器。抛出以下错误。我不确定这是过滤数据的正确方法。任何建议都会有帮助
错误
2021-06-07 10:37:34.615来自工人的恐怖消息:java。lang.ClassCastException:com。测验数据流。多夫尼斯。MongodDbQueryFn无法转换为组织。阿帕奇。梁sdk。伊奥。mongodb。聚合查询组织。阿帕奇。梁sdk。伊奥。mongodb。MongoDbIO$BoundedMongoDbSource。split(MongoDbIO.java:522)
代码:MongoDbIO连接器:
return pipeline.apply(MongoDbIO.read()
.withUri("mongodb://".concat(databaseDetails.getDatabaseHostName()).concat(":").concat(databaseDetails.getPort()))
.withDatabase(databaseDetails.getDatabaseName())
.withCollection(objectDetails.getObjectName())
.withQueryFn(new MongodDbQueryFn("name","Mahesh")));
QueryFn p转换
package com.test.dataflow.dofns;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.bson.Document;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
public class MongodDbQueryFn implements SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String keyName;
private String KeyValue;
public MongodDbQueryFn(String keyName, String KeyValue) {
this.keyName = keyName;
this.KeyValue = KeyValue;
}
@Override
public MongoCursor<Document> apply(MongoCollection<Document> input) {
return input.find(com.mongodb.client.model.Filters.eq(keyName, KeyValue)).iterator();
}
}
# 1 楼答案
关于} 或^{} 。出现错误的原因是,代码正在检查queryFn是否为
withQueryFn
的文档并没有很好地解释这一点,但是从对MongoDbIO的代码阅读来看,MongoDbIO.Read
似乎假定QueryFn
被设置为^{FindQuery
,得到false,然后假设它为AggregationQuery
,并试图强制转换它最好的解决方案是使用
FindQuery
获得与您编写的行为相同的行为,如下所示:# 2 楼答案
抱歉,我现在回复得太晚了,但是,如果您或其他人仍在寻找解决方案来处理多个筛选条件场景,包括(和/或/等)
过滤器。eq过滤器这样做会产生额外的开销(以避免递归构建多个eq和noteq等情况)
我的建议更倾向于转换蒙戈人。JSON字符串到文档 好的方面是,mongo文档接受带有逗号(,)分隔符的json,因此下面的两行代码可以处理所有可能的过滤场景