mqtt实用程序脚本变得简单

mqttwrapper的Python项目详细描述


一个小的胶水包,使快速组合脚本变得简单 桥接MQTT和其他库。见下面的例子。

安装

从pypi安装:

$ pip install mqttwrapper

默认情况下,paho-mqtt将用作mqtt库,但是您可以使用 hbmqtt如果您愿意的话-请看下面。要安装,请使用:

$ pip install mqttwrapper[hbmqtt]

用法

mqttwrapper.run_script处理mqtt连接的设置/维护 和订阅主题,并在消息为 收到。

最简单的脚本如下:

from mqttwrapper import run_script

def callback(topic, payload):
    print("Received payload {} on topic {}".format(payload, topic))

def main():
    run_script(callback, broker="mqtt://127.0.0.1", topics=["my/awesome/topic1", "another/awesome/topic2"])

传递给run_script的任何额外关键字参数都将传递回 调用回调函数时:

from mqttwrapper import run_script

def callback(topic, payload, context, foo):
    assert context == "hello"
    assert foo == "bar"
    print("Received payload {} on topic {}".format(payload, topic))

def main():
    run_script(callback, broker="mqtt://127.0.0.1", topics=["my/awesome/topic1", "another/awesome/topic2"], context="hello", foo="bar")

如果不需要向回调传递任何上下文,则只需要主题和 消息的有效负载,您可以通过创建回调在“importless”模式下运行 定义了回调函数的模块,然后执行 mqttwrapper.run模块:

$ cat callback.py
def callback(topic, payload):
    print(topic, payload)

$ python -m mqttwrapper.run

保留消息

您可以通过在以下情况下传递ignore_retained=True来忽略MQTT保留的消息 正在调用run_script。nb hbmqtt当前不支持此功能 后端。

从回调发布消息

有时回调函数可能希望发布自己的mqtt消息, 可能是对收到的消息的答复或确认。这是 通过从回调返回(topic, payload)元组列表, 例如:

def callback(topic, payload):
  print("Received payload {} on topic {}".format(payload, topic))
  return [
    ("{}/ack".format(topic), payload)
  ]

mqttwrapper将负责将这些消息发布到代理。

配置

brokertopics可以从run_script调用中省略,并且 使用的环境变量:

  • MQTT_BROKER:要连接到的MQTT代理,例如mqtt://127.0.0.1/
  • MQTT_TOPICS:要订阅的主题的逗号分隔列表,例如my/topic1,my/topic2

异步/hbmqtt

如果您愿意,可以使用异步供电的hbmqttmqtt库:

from mqttwrapper.hbmqtt_backend import run_script

async def callback(topic, payload):
    print("Received payload {} on topic {}".format(payload, topic))

注意,在这种情况下,您的回调必须是可等待的。

回调可能需要上下文参数,这些参数本身就是异步对象 或构成挑战的可等待性:如何在异步之外设置这些 调用run_script之前的事件循环?在这种情况下,您可以通过 context_callback可作为run_script的Kwarg等待。这是在 mqtt循环的开始,并应返回一个dict,该dict将合并到 传递给回调的Kwargs。例如:

from mqttwrapper.hbmqtt_backend import run_script

async def setup_db():
  return {
    "query_db": query_db
  }

async def query_db(value):
  # pretend this is some slow DB query, for example.
  await asyncio.sleep(3)
  return value * 2

async def callback(topic, payload, query_db):
    db_result = await query_db(int(payload))
    print("Received payload {} on topic {}, db result: {}".format(payload, topic, db_result))

def main():
    run_script(callback, context_callback=setup_db)

nb hbmqtt的重新连接处理不会在 重新连接,mqttwrapper还不能解决这个问题。

示例

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
eclipse java btc交易应用程序编译时错误(HTTP组件)   java客户端状态保存,我能看到发送到客户端的状态数据吗?   spring中的java内部字段注入工作以及为什么不建议使用它   在iReports、Jasper和JavaBean数据源中使用表功能   如何在Java树中查找节点   用于日志分析的java模式匹配   java如何在Spring中处理json列表?   java SQL注入和可能的攻击   java如果我将实体转换为DTO,那么转换代码应该存在于何处?   java素数方法不起作用   通过NFC启动java Android Q隐私更改活动   java在Spring引导时为Spring AMQP和RabbitMQ动态设置主机   swing Java Paste From Clipboard不适用于Linux上的所有应用程序   java从不同对象访问对象的内部类   java Ektorp CouchDB测试连接   十进制前8位、十进制后2位和整个字符串的java正则表达式不应计算为零   在Java中以长格式存储两个整数之和   ConfigurationProperties中嵌套属性的java验证