<p>下面是@MickaelMaison的<a href="https://stackoverflow.com/a/53267676/4306852">answer</a>给出的一个实现。我用了<a href="https://kafka-python.readthedocs.io/en/master/index.html" rel="nofollow noreferrer">kafka-python</a>。在</p>
<pre><code>from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:\n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
</code></pre>
<p>轮询是在另一个线程中完成的。接收到消息后,通过传递从Kafka检索到的数据来调用侦听器。在</p>