mqtt实用程序脚本变得简单
mqttwrapper的Python项目详细描述
一个小的胶水包,使快速组合脚本变得简单 桥接MQTT和其他库。见下面的例子。
安装
从pypi安装:
$ pip install mqttwrapper
默认情况下,paho-mqtt将用作mqtt库,但是您可以使用 hbmqtt如果您愿意的话-请看下面。要安装,请使用:
$ pip install mqttwrapper[hbmqtt]
用法
mqttwrapper.run_script处理mqtt连接的设置/维护 和订阅主题,并在消息为 收到。
最简单的脚本如下:
from mqttwrapper import run_script def callback(topic, payload): print("Received payload {} on topic {}".format(payload, topic)) def main(): run_script(callback, broker="mqtt://127.0.0.1", topics=["my/awesome/topic1", "another/awesome/topic2"])
传递给run_script的任何额外关键字参数都将传递回 调用回调函数时:
from mqttwrapper import run_script def callback(topic, payload, context, foo): assert context == "hello" assert foo == "bar" print("Received payload {} on topic {}".format(payload, topic)) def main(): run_script(callback, broker="mqtt://127.0.0.1", topics=["my/awesome/topic1", "another/awesome/topic2"], context="hello", foo="bar")
如果不需要向回调传递任何上下文,则只需要主题和 消息的有效负载,您可以通过创建回调在“importless”模式下运行 定义了回调函数的模块,然后执行 mqttwrapper.run模块:
$ cat callback.py def callback(topic, payload): print(topic, payload) $ python -m mqttwrapper.run
保留消息
您可以通过在以下情况下传递ignore_retained=True来忽略MQTT保留的消息 正在调用run_script。nb hbmqtt当前不支持此功能 后端。
从回调发布消息
有时回调函数可能希望发布自己的mqtt消息, 可能是对收到的消息的答复或确认。这是 通过从回调返回(topic, payload)元组列表, 例如:
def callback(topic, payload): print("Received payload {} on topic {}".format(payload, topic)) return [ ("{}/ack".format(topic), payload) ]
mqttwrapper将负责将这些消息发布到代理。
配置
broker和topics可以从run_script调用中省略,并且 使用的环境变量:
- MQTT_BROKER:要连接到的MQTT代理,例如mqtt://127.0.0.1/
- MQTT_TOPICS:要订阅的主题的逗号分隔列表,例如my/topic1,my/topic2
异步/hbmqtt
如果您愿意,可以使用异步供电的hbmqttmqtt库:
from mqttwrapper.hbmqtt_backend import run_script async def callback(topic, payload): print("Received payload {} on topic {}".format(payload, topic))
注意,在这种情况下,您的回调必须是可等待的。
回调可能需要上下文参数,这些参数本身就是异步对象 或构成挑战的可等待性:如何在异步之外设置这些 调用run_script之前的事件循环?在这种情况下,您可以通过 context_callback可作为run_script的Kwarg等待。这是在 mqtt循环的开始,并应返回一个dict,该dict将合并到 传递给回调的Kwargs。例如:
from mqttwrapper.hbmqtt_backend import run_script async def setup_db(): return { "query_db": query_db } async def query_db(value): # pretend this is some slow DB query, for example. await asyncio.sleep(3) return value * 2 async def callback(topic, payload, query_db): db_result = await query_db(int(payload)) print("Received payload {} on topic {}, db result: {}".format(payload, topic, db_result)) def main(): run_script(callback, context_callback=setup_db)
nb hbmqtt的重新连接处理不会在 重新连接,mqttwrapper还不能解决这个问题。
示例
- rxv2mqtt
- tradfri-mqtt(使用异步)