是否有针对事件驱动的Kafka消费者的PythonAPI?

2024-10-03 21:34:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在尝试构建一个以Kafka作为唯一界面的Flask应用程序。因此,我希望有一个Kafka消费者,当相关主题的流中有新消息时触发它,并通过将消息推回到Kafka流来响应。在

我一直在寻找类似于Spring实现的东西:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

我看过:

  1. kafka-python
  2. pykafka
  3. confluent-kafka

但是我在Python中找不到任何与事件驱动的实现风格相关的东西。在


Tags: kafka应用程序消息flaskmessage主题界面消费者
2条回答

卡夫卡消费者必须不断地进行投票,才能从经纪人那里获取数据。在

Spring给了您这个奇特的API,但实际上,它在一个循环中调用poll,并且只在检索到记录时调用您的方法。在

您可以轻松地构建与您提到的任何Python客户机相似的东西。与Java一样,这不是Kafka客户机直接公开的API,而是由顶层提供的API。这是你需要建立的东西。在

下面是@MickaelMaison的answer给出的一个实现。我用了kafka-python。在

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

轮询是在另一个线程中完成的。接收到消息后,通过传递从Kafka检索到的数据来调用侦听器。在

相关问题 更多 >