MQTT协议的客户端

gmqtt的Python项目详细描述


Build Statuscodecov

python mqtt客户端实现。

安装

python包索引(pypi)中提供了最新的稳定版本,可以使用

pip3 install gmqtt

使用量

入门

下面是一个非常简单的示例,它订阅代理主题并打印出结果消息:

importasyncioimportosimportsignalimporttimefromgmqttimportClientasMQTTClient# gmqtt also compatibility with uvloop  importuvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())STOP=asyncio.Event()defon_connect(client,flags,rc,properties):print('Connected')client.subscribe('TEST/#',qos=0)defon_message(client,topic,payload,qos,properties):print('RECV MSG:',payload)defon_disconnect(client,packet,exc=None):print('Disconnected')defon_subscribe(client,mid,qos):print('SUBSCRIBED')defask_exit(*args):STOP.set()asyncdefmain(broker_host,token):client=MQTTClient("client-id")client.on_connect=on_connectclient.on_message=on_messageclient.on_disconnect=on_disconnectclient.on_subscribe=on_subscribeclient.set_auth_credentials(token,None)awaitclient.connect(broker_host)client.publish('TEST/TIME',str(time.time()),qos=1)awaitSTOP.wait()awaitclient.disconnect()if__name__=='__main__':loop=asyncio.get_event_loop()host='mqtt.flespi.io'token=os.environ.get('FLESPI_TOKEN')loop.add_signal_handler(signal.SIGINT,ask_exit)loop.add_signal_handler(signal.SIGTERM,ask_exit)loop.run_until_complete(main(host,token))

MQTT版本5.0

gmqtt支持mqtt版本5.0协议

版本设置

默认情况下使用5.0版。如果代理不支持5.0协议版本,并使用正确的connack原因代码响应,则客户端将降级到3.1并自动重新连接。注意,有些代理无法解析5.0格式的连接数据包,所以首先手动检查代理是否正确处理了这个问题。 您也可以在连接方法中强制版本:

fromgmqtt.mqtt.constantsimportMQTTv311client=MQTTClient('clientid')client.set_auth_credentials(token,None)awaitclient.connect(broker_host,1883,keepalive=60,version=MQTTv311)

属性

MQTT 5.0协议允许在包中包含自定义属性,下面是在发布的消息中传递响应主题属性的示例:

TOPIC='testtopic/TOPIC'defon_connect(client,flags,rc,properties):client.subscribe(TOPIC,qos=1)print('Connected')defon_message(client,topic,payload,qos,properties):print('RECV MSG:',topic,payload.decode(),properties)asyncdefmain(broker_host,token):client=MQTTClient('asdfghjk')client.on_message=on_messageclient.on_connect=on_connectclient.set_auth_credentials(token,None)awaitclient.connect(broker_host,1883,keepalive=60)client.publish(TOPIC,'Message payload',response_topic='RESPONSE/TOPIC')awaitSTOP.wait()awaitclient.disconnect()
连接属性

connect属性作为kwargs传递给Client对象(稍后它们与从代理接收的属性一起存储在client.properties字段中)。参见下面的示例。

  • session_expiry_interval-int会话到期间隔(秒)。如果缺少会话到期时间间隔,则使用值0。如果设置为0或不存在,则会话将在网络连接关闭时结束。如果会话到期时间间隔为0xffffff(最大可能值),则会话不会到期。
  • receive_maximum-int客户端使用此值来限制它愿意同时处理的QoS 1和QoS 2发布的数量。
  • user_property-tuple(str, str)此属性可用于提供其他诊断或其他信息(键值对)。
  • 客户端使用最大包大小(以字节为单位)通知服务器它不会处理超过此限制的数据包。

示例:

client = gmqtt.Client("lenkaklient", receive_maximum=24000, session_expiry_interval=60, user_property=('myid', '12345'))
发布属性

这些属性也将在发布包中从代理发送,它们将被传递到on_message回调。

  • message_expiry_interval-int如果存在,则该值是应用程序消息的生存期(秒)。
  • content_type-unicode描述应用程序消息内容的utf-8编码字符串。内容类型的值由发送和接收应用程序定义。
  • user_property-tuple(str, str)
  • subscription_identifier-int(参见订阅属性)由代理发送

示例:

def on_message(client, topic, payload, qos, properties):
    # properties example here: {'content_type': ['json'], 'user_property': [('timestamp', '1524235334.881058')], 'message_expiry_interval': [60], 'subscription_identifier': [42, 64]}
    print('RECV MSG:', topic, payload, properties)

client.publish('TEST/TIME', str(time.time()), qos=1, retain=True, message_expiry_interval=60, content_type='json')
订阅属性
  • subscription_identifier-int如果客户端为任何重叠订阅指定了订阅标识符,则服务器必须在作为订阅结果发布的消息中发送这些订阅标识符。

重新连接

默认情况下,连接的MQTT客户机将始终尝试在连接丢失时重新连接。尝试重新连接的次数不受限制。 如果要更改此行为,请执行以下操作:

client=MQTTClient("client-id")client.set_config({'reconnect_retries':10,'reconnect_delay':60})

上面的代码将重新连接尝试次数设置为10次,并将重新连接尝试之间的延迟设置为1分钟(60秒)。默认情况下,reconnect_delay=6reconnect_retries=-1表示无穷大。 请注意,手动调用await client.disconnect()将为0设置reconnect_retries,这将停止自动重新连接。

其他示例

查看examples directory了解更多用例。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
我可以用C++代码使用java代码吗?   java使用JSR303在派生类中提供更具体的约束   java在这个查找唯一路径数算法中我做错了什么?   java如何为2个不同的服务提供商使用2个不同的SSL证书?   java在Gridview上绘制文本   java使用连接for循环构建字符串名   java StringBuilder拆分无法处理某些文件   java事件关注EditText   Java Web Start“找不到URL的缓存资源”   java程序从命令行运行的速度比在Eclipse中慢   java为什么HttpServletRequest会截断#字符上的url输入?   java自定义折叠工具栏平滑标题大小调整   使用Mockito对安卓 java中调用另一个静态函数的函数进行单元测试   http在java客户机中使用cachecontrol头   java如何使用。是否使用Delimiter从输入文件中排除标点符号和数字?   使用上下文作为参数/参数的java   java更有效地从Jar中提取文件   java为多个JButton提供相同的actionListener