MQTT的协调生产者消费者

mqtt-coordinated的Python项目详细描述


##MQTT协调消费者。

卡夫卡激励的MQTT协调消费者

pip安装mqtt coordinated

##MQTT用户协调管理器

CoordinatorManager类是MQTT使用者的管理器类。它允许您连接到mqtt服务器并订阅多个主题。它会在收到新消息后为操作提供on-u消息回调。

` >>> from mqtt import CoordinatorManager >>> >>> manager = CoordinatorManager('my-manager', 'iot.eclipse.org') >>> manager.start() >>> >>> consumer = manager.coordinated_consumer >>> consumer.on_message = on_message  # Pass callback name here. >>> consumer.subscribe("house/bulb") >>> consumer.poll(100)    # Batch message reading construct for streaming purpose `

` # Disconnect and stop consuming >>> consumer.disconnect() >>> manager.stop() `

有两种消费事件的方法, -正在注册on-u消息 -分批阅读邮件。批处理存储在内存中,暂时不存储在永久性磁盘上。

##MQTT生产者协调生产者

coordinatedproducer类是mqtt producer,它将在mqtt主题上创建多个分区。您可以将分区号或分区密钥传递给此生产者。具有相同分区密钥的消息被允许在同一分区上生成。

` >>> from mqtt import CoordinatedProducer >>> producer = CoordinatedProducer('iot.eclipse.org') >>> producer.publish_on_partition("house/bulb", "on") # Message will be published on random partition >>> producer.publish_on_partition("house/bulb", "on", partition=5) # Message will be published on 5th partition >>> producer.publish_on_partition("house/bulb", "on", partition_key='message_key') # All messages with partition_key will be published on same partition. `

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
如何在Java中以反射方式使用泛型参数调用方法?   java分配给另一个变量的变量是否更改(原始变量更改)第二个变量是否更改?   java没有此类元素异常(警告:服务器未提供任何stacktrace信息)   java检查用户是否经过密码验证或与某个提供者进行了验证   java在向“价格”和“数量”列单元格添加数据时更新JTable中的“金额”列单元格   Android Studio找不到java编译器   java“在foo类的公共方法中,哪个变量(实例或本地)起作用?”   java动态Log4j2 LogfilePath   java使用OO编程避免多个嵌套if   java有没有办法在IntelliJ更改跟踪中突出显示未保存或更改的行   Java中两个矩阵相乘的数组   java打印包含阿拉伯字符的字符串会导致问号。如何修复?   java为什么声明整型静态会导致代码中出现错误?   java在使用@Bean Spring注释创建Bean时遇到异常   java是否将JavaCV添加为依赖项,以便在Raspberry PI上运行?   java如何使用trycatch测试注入的mock   java如何在不同的环境(开发、测试、生产)中维护相同的数据表?   java将Char转换为KeyEvent