MQTT协议的客户端
gmqtt的Python项目详细描述
python mqtt客户端实现。
安装
python包索引(pypi)中提供了最新的稳定版本,可以使用
pip3 install gmqtt
使用量
入门
下面是一个非常简单的示例,它订阅代理主题并打印出结果消息:
importasyncioimportosimportsignalimporttimefromgmqttimportClientasMQTTClient# gmqtt also compatibility with uvloop importuvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())STOP=asyncio.Event()defon_connect(client,flags,rc,properties):print('Connected')client.subscribe('TEST/#',qos=0)defon_message(client,topic,payload,qos,properties):print('RECV MSG:',payload)defon_disconnect(client,packet,exc=None):print('Disconnected')defon_subscribe(client,mid,qos):print('SUBSCRIBED')defask_exit(*args):STOP.set()asyncdefmain(broker_host,token):client=MQTTClient("client-id")client.on_connect=on_connectclient.on_message=on_messageclient.on_disconnect=on_disconnectclient.on_subscribe=on_subscribeclient.set_auth_credentials(token,None)awaitclient.connect(broker_host)client.publish('TEST/TIME',str(time.time()),qos=1)awaitSTOP.wait()awaitclient.disconnect()if__name__=='__main__':loop=asyncio.get_event_loop()host='mqtt.flespi.io'token=os.environ.get('FLESPI_TOKEN')loop.add_signal_handler(signal.SIGINT,ask_exit)loop.add_signal_handler(signal.SIGTERM,ask_exit)loop.run_until_complete(main(host,token))
MQTT版本5.0
gmqtt支持mqtt版本5.0协议
版本设置
默认情况下使用5.0版。如果代理不支持5.0协议版本,并使用正确的connack原因代码响应,则客户端将降级到3.1并自动重新连接。注意,有些代理无法解析5.0格式的连接数据包,所以首先手动检查代理是否正确处理了这个问题。 您也可以在连接方法中强制版本:
fromgmqtt.mqtt.constantsimportMQTTv311client=MQTTClient('clientid')client.set_auth_credentials(token,None)awaitclient.connect(broker_host,1883,keepalive=60,version=MQTTv311)
属性
MQTT 5.0协议允许在包中包含自定义属性,下面是在发布的消息中传递响应主题属性的示例:
TOPIC='testtopic/TOPIC'defon_connect(client,flags,rc,properties):client.subscribe(TOPIC,qos=1)print('Connected')defon_message(client,topic,payload,qos,properties):print('RECV MSG:',topic,payload.decode(),properties)asyncdefmain(broker_host,token):client=MQTTClient('asdfghjk')client.on_message=on_messageclient.on_connect=on_connectclient.set_auth_credentials(token,None)awaitclient.connect(broker_host,1883,keepalive=60)client.publish(TOPIC,'Message payload',response_topic='RESPONSE/TOPIC')awaitSTOP.wait()awaitclient.disconnect()
连接属性
connect属性作为kwargs传递给Client
对象(稍后它们与从代理接收的属性一起存储在client.properties
字段中)。参见下面的示例。
session_expiry_interval
-int
会话到期间隔(秒)。如果缺少会话到期时间间隔,则使用值0。如果设置为0或不存在,则会话将在网络连接关闭时结束。如果会话到期时间间隔为0xffffff(最大可能值),则会话不会到期。receive_maximum
-int
客户端使用此值来限制它愿意同时处理的QoS 1和QoS 2发布的数量。user_property
-tuple(str, str)
此属性可用于提供其他诊断或其他信息(键值对)。
客户端使用最大包大小(以字节为单位)通知服务器它不会处理超过此限制的数据包。
示例:
client = gmqtt.Client("lenkaklient", receive_maximum=24000, session_expiry_interval=60, user_property=('myid', '12345'))
发布属性
这些属性也将在发布包中从代理发送,它们将被传递到on_message
回调。
message_expiry_interval
-int
如果存在,则该值是应用程序消息的生存期(秒)。content_type
-unicode
描述应用程序消息内容的utf-8编码字符串。内容类型的值由发送和接收应用程序定义。user_property
-tuple(str, str)
subscription_identifier
-int
(参见订阅属性)由代理发送
示例:
def on_message(client, topic, payload, qos, properties):
# properties example here: {'content_type': ['json'], 'user_property': [('timestamp', '1524235334.881058')], 'message_expiry_interval': [60], 'subscription_identifier': [42, 64]}
print('RECV MSG:', topic, payload, properties)
client.publish('TEST/TIME', str(time.time()), qos=1, retain=True, message_expiry_interval=60, content_type='json')
订阅属性
subscription_identifier
-int
如果客户端为任何重叠订阅指定了订阅标识符,则服务器必须在作为订阅结果发布的消息中发送这些订阅标识符。
重新连接
默认情况下,连接的MQTT客户机将始终尝试在连接丢失时重新连接。尝试重新连接的次数不受限制。 如果要更改此行为,请执行以下操作:
client=MQTTClient("client-id")client.set_config({'reconnect_retries':10,'reconnect_delay':60})
上面的代码将重新连接尝试次数设置为10次,并将重新连接尝试之间的延迟设置为1分钟(60秒)。默认情况下,reconnect_delay=6
和reconnect_retries=-1
表示无穷大。
请注意,手动调用await client.disconnect()
将为0设置reconnect_retries
,这将停止自动重新连接。
其他示例
查看examples directory了解更多用例。