通过GCP上的python脚本访问Kafka producer服务器

2024-09-28 05:19:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我在谷歌云平台集群上成功地连接了卡夫卡生产商和消费者:

$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092  --topic test

在一个新的外壳中执行

^{pr2}$

现在,我想使用以下python脚本将消息发送到Kafka producer服务器:

from kafka import *

topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092', 
api_version=(0,10))

producer.send(topic, b"Test test test")

但是,这会导致KafkaTimeoutError

"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

在网上四处看看告诉我要考虑:

  • /usr/lib/kafka/config/server.properties文件中取消listeners=...advertised.listeners=...的注释。在

但是,listeners=PLAINTEXT://:9092不起作用,thispost建议设置PLAINTEXT://<external-ip>:9092。在

所以,我开始想通过GCP集群的外部(静态)IP地址访问Kafka服务器。然后,我们设置了一个防火墙规则来访问端口(?)并允许https访问群集。但我不确定这是否是对问题的过分夸大。在

我确实需要一些指导来从python脚本成功地连接到Kafka服务器。在


Tags: producerkafkatest服务器configtopicserverlib
2条回答

罗宾,谢谢!您发布的链接对查找以下工作配置非常有帮助。在

尽管SimpleProducer似乎是一种不推荐的方法,但以下设置最终对我有效:

Python脚本:

from kafka import *
topic = 'test'
kafka = KafkaClient('[project-name]-w-0.c.[cluster-id].internal:9092')
producer = SimpleProducer(kafka)

message = "Test"
producer.send_messages(topic, message.encode('utf-8'))

并取消/usr/lib/kafka/config/server.properties文件中的注释:

^{2}$

您需要将advertised.listeners设置为客户端连接的地址。在

更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/

相关问题 更多 >

    热门问题