有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

ApacheBeamJava:MongoDB过滤器

我对dataflow/Apchebeam不熟悉。我正在从MongoDB获取数据。MongoDB连接工作正常,但我无法应用过滤器。抛出以下错误。我不确定这是过滤数据的正确方法。任何建议都会有帮助

错误

2021-06-07 10:37:34.615来自工人的恐怖消息:java。lang.ClassCastException:com。测验数据流。多夫尼斯。MongodDbQueryFn无法转换为组织。阿帕奇。梁sdk。伊奥。mongodb。聚合查询组织。阿帕奇。梁sdk。伊奥。mongodb。MongoDbIO$BoundedMongoDbSource。split(MongoDbIO.java:522)

代码:MongoDbIO连接器:

return  pipeline.apply(MongoDbIO.read()                        
.withUri("mongodb://".concat(databaseDetails.getDatabaseHostName()).concat(":").concat(databaseDetails.getPort()))
.withDatabase(databaseDetails.getDatabaseName())
.withCollection(objectDetails.getObjectName())
.withQueryFn(new MongodDbQueryFn("name","Mahesh")));

QueryFn p转换

package com.test.dataflow.dofns;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.bson.Document;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;

public class MongodDbQueryFn implements SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String keyName;
    private String KeyValue;

    public MongodDbQueryFn(String keyName, String KeyValue) {

        this.keyName = keyName;
        this.KeyValue = KeyValue;

    }

    @Override
    public MongoCursor<Document> apply(MongoCollection<Document> input) {
        return input.find(com.mongodb.client.model.Filters.eq(keyName, KeyValue)).iterator();
    }
    
}

共 (2) 个答案

  1. # 1 楼答案

    关于withQueryFn的文档并没有很好地解释这一点,但是从对MongoDbIO的代码阅读来看,MongoDbIO.Read似乎假定QueryFn被设置为^{}^{}。出现错误的原因是,代码正在检查queryFn是否为FindQuery,得到false,然后假设它为AggregationQuery,并试图强制转换它

    最好的解决方案是使用FindQuery获得与您编写的行为相同的行为,如下所示:

    .withQueryFn(FindQuery.create().withFilters(Filters.eq("name", "Mahesh"))));
    
  2. # 2 楼答案

    抱歉,我现在回复得太晚了,但是,如果您或其他人仍在寻找解决方案来处理多个筛选条件场景,包括(和/或/等)

    过滤器。eq过滤器这样做会产生额外的开销(以避免递归构建多个eq和noteq等情况)

    我的建议更倾向于转换蒙戈人。JSON字符串到文档 好的方面是,mongo文档接受带有逗号(,)分隔符的json,因此下面的两行代码可以处理所有可能的过滤场景

        String filterJson="{\n" +
                "     status: \"A\",\n" +
                "     $or: [ { qty: { $lt: 30 } }, { item: /^p/ } ]\n" +
                "}";
    
        Bson dc =BsonDocument.parse(filterJson);
    
        PCollection<Document> mongoVal = p.apply(MongoDbIO.read()
                        .withUri("mongodb://localhost:27017")
                        .withDatabase("test")
                        .withCollection("inventory_collection")
                        .withQueryFn(FindQuery.create().withFilters(dc)));