我在谷歌云平台集群上成功地连接了卡夫卡生产商和消费者:
$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test
在一个新的外壳中执行
^{pr2}$现在,我想使用以下python脚本将消息发送到Kafka producer服务器:
from kafka import *
topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
api_version=(0,10))
producer.send(topic, b"Test test test")
但是,这会导致KafkaTimeoutError
:
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
在网上四处看看告诉我要考虑:
/usr/lib/kafka/config/server.properties
文件中取消listeners=...
和advertised.listeners=...
的注释。在但是,listeners=PLAINTEXT://:9092
不起作用,thispost建议设置PLAINTEXT://<external-ip>:9092
。在
所以,我开始想通过GCP集群的外部(静态)IP地址访问Kafka服务器。然后,我们设置了一个防火墙规则来访问端口(?)并允许https访问群集。但我不确定这是否是对问题的过分夸大。在
我确实需要一些指导来从python脚本成功地连接到Kafka服务器。在
罗宾,谢谢!您发布的链接对查找以下工作配置非常有帮助。在
尽管
SimpleProducer
似乎是一种不推荐的方法,但以下设置最终对我有效:Python脚本:
并取消
^{2}$/usr/lib/kafka/config/server.properties
文件中的注释:您需要将
advertised.listeners
设置为客户端连接的地址。在更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/
相关问题 更多 >
编程相关推荐