Kafka Persistence StateStore不能与python和java结合使用

2024-10-04 01:36:46 发布

您现在位置:Python中文网/ 问答频道 /正文

今天我在卡夫卡州立商店发现了一件很奇怪的事情,我在谷歌上搜索了很多东西,但没有找到这种行为的原因

考虑下面的状态存储在<强> java < /强>:

private KeyValueStore<String, GenericRecord> userIdToUserRecord;

有两个处理器正在使用此状态存储

  topology.addStateStore(userIdToUserRecord, ALERT_PROCESSOR_NAME, USER_SETTING_PROCESSOR_NAME)

用户\u设置\u处理器\u名称将数据放入状态存储

userIdToUserRecord.put("user-12345", record);

警报处理器名称将从状态存储中获取数据

userIdToUserRecord.get("user-12345");

将源添加到UserSettingProcessor

userSettingTopicName = user-setting-topic;    
topology.addSource(sourceName, userSettingTopicName)
                    .addProcessor(processorName, UserSettingProcessor::new, sourceName);

将源添加到AlertEngineProcessor

alertTopicName = alert-topic;
topology.addSource(sourceName, alertTopicName)
                    .addProcessor(processorName, AlertEngineProcessor::new, sourceName);

案例1: 使用java中的卡夫卡产品制作记录
首先使用java将记录生成到主题用户设置主题,它会将用户记录添加到状态存储中 第二,使用java生成主题的记录警报主题,它将使用用户iduserIdToUserRecord.get(“user-12345”)从状态存储中获取记录

工作很好,我正在使用卡夫卡夫罗普勒制作这两个主题的记录

案例2: 首先使用python生成主题用户设置主题的记录,它将用户记录添加到状态存储*userIdToUserRecord.put(“user-100”,generiRecord)

第二,使用java生成主题的记录警报主题,它将使用用户iduserIdToUserRecord.get(“user-100”)从状态存储中获取记录

这里发生的奇怪情况userIdToUserRecord.get(“user-100”)将返回null

我也检查了这样的场景 我使用python生成记录到用户设置主题,然后触发userSettingProcessor进程方法,在调试模式下检查并尝试从状态存储userIdToUserRecord.get(“user-100”)获取用户记录在userSettingProcessor中工作正常我能够从状态存储获取数据

然后,我使用java生成记录以提醒主题,然后尝试获取userIdToUserRecord.get(“user-100”),它将返回null

我不知道这种奇怪的行为,有人告诉我这种行为

Python代码:

value_schema = avro.load('user-setting.avsc')
value = {
    "user-id":"user-12345",
    "client_id":"5cfdd3db-b25a-4e21-a67d-462697096e20",
    "alert_type":"WORK_ORDER_VOLUME"
}

print("------------------------Kafka Producer------------------------------")
avroProducer = AvroProducer(
    {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8089'},
    default_value_schema=value_schema)
avroProducer.produce(topic="user-setting-topic", value=value)
print("------------------------Sucess Producer------------------------------")
avroProducer.flush() 

Java代码:

 Schema schema = new Schema.Parser().parse(schemaString);

        GenericData.Record record = new GenericData.Record(schema);
        record.put("alert_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        record.put("alert_created_at",123449437L);
        record.put("alert_type","WORK_ORDER_VOLUME");
        record.put("client_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        //record.put("property_key","property_key-"+i);

        record.put("alert_data","{\"alert_trigger_info\":{\"jll_value\":1.4,\"jll_category\":\"internal\",\"name\":\"trade_Value\",\"current_value\":40,\"calculated_value\":40.1},\"work_order\":{\"locations\":{\"country_name\":\"value\",\"state_province\":\"value\",\"city\":\"value\"},\"property\":{\"name\":\"property name\"}}}");
        return record;

Tags: 用户主题gettopicputvalueschema状态
1条回答
网友
1楼 · 发布于 2024-10-04 01:36:46

问题是Java生产者和Python生产者(基于C生产者)使用不同的默认哈希函数进行数据分区。您需要为其中一个(或两者)提供自定义分区,以确保它们使用相同的分区策略

不幸的是,Kafka协议没有指定默认的分区散列函数应该是什么,因此客户机在默认情况下可以使用他们想要的任何东西

相关问题 更多 >