卡夫卡制片人的差异

2024-09-26 18:16:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我们有一个卡夫卡的消费者,他会阅读消息,做一些事情,然后使用下面的脚本再次发布到卡夫卡主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}

我没有配置任何其他配置,如queue.buffering.max.messagesqueue.buffering.max.msbatch.num.messages

我假设这些都是来自configuration的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000

我的理解是:当内部队列到达queue.buffering.max.ms或batch.num.messages时,消息将在单独的线程中发布到Kafka。在我的配置queue.buffering.max.ms中为0,因此当我调用product()时,所有消息都将被发布。如果我错了就纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()

this post我了解到,在每条消息之后使用flush,producer将成为同步producer。如果我使用上面的脚本,需要大约45毫秒才能发布到Kafka

如果我将上面的代码片段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)

有什么表现会有所改善吗?你能澄清我的理解吗。

谢谢


Tags: kafka脚本send消息messagetopicqueuedef
1条回答
网友
1楼 · 发布于 2024-09-26 18:16:07

客户机的documentation解释了flush()poll()之间的区别。

对于^{},它声明:

Wait for all messages in the Producer queue to be delivered. This is a convenience method that calls poll() until len() is zero or the optional timeout elapses.

对于^{}

Polls the producer for events and calls the corresponding callbacks (if registered).

send()之后调用poll()不会使生产者同步,因为刚刚发送的消息不太可能已经到达代理,并且传递报告已经发送回客户端。

相反,flush()将阻塞,直到先前发送的消息被传递(或出错),有效地使生产者同步。

相关问题 更多 >

    热门问题