无法访问EC2上合流卡夫卡的消息

2024-06-15 04:26:18 发布

您现在位置:Python中文网/ 问答频道 /正文

合流卡夫卡5.0.0已安装在AWS EC2上,其公共IP地址为54。二十、 XX.XX号 用0.0.0.0打开EC2机器上的端口9092

在/etc/kafka/服务器属性我有

advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092  
listeners=PLAINTEXT://0.0.0.0:9092

/etc/kafka/producer.properties我有bootstrap.servers=0.0.0.0:9092

在本地计算机上 在/etc/kafka/consumer.properties我有bootstrap.servers=54.XX.XX.XX:9092

在EC2中,启动kafka 'confluent start'并创建“mytopic”

我的制作人.py从本地计算机运行的代码看起来像(相关部分):

from confluent_kafka import Producer
broker = '54.XX.XX.XX'
topic = 'mytopic'
    p = Producer({'bootstrap.servers': broker})

    for data in dictList:
        p.poll(0)
        sendme = json.dumps(data)
        p.produce(topic, sendme.encode('utf-8'), callback=delivery_report)

    p.flush()

这似乎将消息写入EC2中kafka流中的“mytopic”。我能在卡夫卡卡特b 54里看到这些信息。二十、 XX.XX号-我的主题在EC2上。你知道吗

但作为一个简单的消息打印消费者,我无法从本地计算机访问这些消息,代码如下:

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import sys

broker = '54.XX.XX.XX'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })
     basic_consume_loop(c,[topic])

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('{} [{}] reached end at offset {}\n'.format(msg.topic(), msg.partition(), msg.offset()))
                    data_process()
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        print("Shutting down the consumer")
        consumer.close()

它只是挂了,我错过了任何设置吗?你知道吗


Tags: kafkaimporttopicconsumer计算机etcmsgerror
1条回答
网友
1楼 · 发布于 2024-06-15 04:26:18

以下步骤似乎有效。你知道吗

在本地和EC2机器上,在/etc/kakfa中/服务器属性套

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092

在本地计算机上,在/etc/kakfa中/生产商属性套

bootstrap.servers=0.0.0.0:9092

在EC2机器上,在/etc/kakfa中/生产商属性套

bootstrap.servers=localhost:9092

在本地和EC2机器上,在/etc/kakfa中/消费者财产套

bootstrap.servers=0.0.0.0:9092
group.id=mygroup

使用“合流启动”在远程EC2计算机上启动所有必需的守护程序。 在本地计算机上,合流不使其运行。你知道吗

在本地计算机上(对于ip隐藏,可选):

export KAFKA_PRODUCER_IP=54.XX.XX.XX

这样,本地机器的生产者就可以通过以下方式将消息放到远程EC2 Kafka上:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
p = Producer({'bootstrap.servers': broker})

从本地计算机,可以通过以下方式从远程EC2 kafka获取消息:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })

这些步骤似乎奏效了。可能会有一些冗余,如果是的话,请指出。你知道吗

相关问题 更多 >