我们有一个卡夫卡的消费者,他会阅读消息,做一些事情,然后使用下面的脚本再次发布到卡夫卡主题
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
我没有配置任何其他配置,如queue.buffering.max.messages
queue.buffering.max.ms
batch.num.messages
我假设这些都是来自configuration的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
我的理解是:当内部队列到达queue.buffering.max.ms或batch.num.messages时,消息将在单独的线程中发布到Kafka。在我的配置queue.buffering.max.ms中为0,因此当我调用product()时,所有消息都将被发布。如果我错了就纠正我。
我的制作人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
从this post我了解到,在每条消息之后使用flush,producer将成为同步producer。如果我使用上面的脚本,需要大约45毫秒才能发布到Kafka
如果我将上面的代码片段更改为
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
有什么表现会有所改善吗?你能澄清我的理解吗。
谢谢
客户机的documentation解释了
flush()
和poll()
之间的区别。对于^{} ,它声明:
对于^{} :
在
send()
之后调用poll()
不会使生产者同步,因为刚刚发送的消息不太可能已经到达代理,并且传递报告已经发送回客户端。相反,
flush()
将阻塞,直到先前发送的消息被传递(或出错),有效地使生产者同步。相关问题 更多 >
编程相关推荐