卡夫卡普顿的包装纸
kafkaPythonWrapper的Python项目详细描述
kafka-python
的包装安装
使用pip安装:
pip install kafkaPythonWrapper
从Github安装:
^{pr2}$CLI
用法
usage: kafkaPython [-h] --type type --topic topic [--group_id group_id] --bootstrap_server bootstrap_server [--value value][--key key] Required arguments: --type type of kafka client, consumer or producer --topic specify a topic for Kafka --bootstrap_server specify a bootstrap server for Kafka Optional arguments: --group_id specify a group ID for Kafka consumer --value specify a value to send to Kafka --key specify a key to send to Kafka
client type是生产者或消费者。如果类型是producer,则标志value是必需的,而key是可选的。如果类型是consumer,则需要标志group\u id。在
简单示例:
给卡夫卡发个信
kafkaPython --type producer --topic test --bootstrap_server 'localhost:9092' --key test_key --value test_value
打印卡夫卡的消息
kafkaPython --type consumer --bootstrap_server 'localhost:9092' --topic test --group_id 1
API
用法
将函数的输出发送给Kafka
fromkafkaPythonWrapperimportMessageSendermessage_sender=MessageSender(topic='test')@message_sender.send_sync()defproduce_message():key,value='email_address','wzhang@leadbook.com'print(f'{key}: {value}')yieldkey,value
接收和打印来自卡夫卡的消息
fromkafkaPythonWrapperimportMessageCollectormessage_collector=MessageCollector(topic='test',group_id='1')@message_collector.consumedefprint_message(**kwargs):print(kwargs)
关闭生产商或消费者
message_sender.close()message_collector.close()
- 项目
标签: