使用Popen、Serial和MQTT的复杂应用程序中的线程

2024-10-03 17:23:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在创建一个python3模块,它考虑到:

  1. 来自paho.mqtt.client的MQTT库
  2. 利用pynmea2 and serial读取GPS数据并进行解析
  3. Popen and PIPE子进程读取特定信息
  4. InfluxDB读/写应用程序

结构

.
|-- myApp
|   |-- mqtt
|   |-- db
|   |-- gps
|   |-- n2k
|-- main.py

mqtt中,我为mqtt.Client创建了一个包装类,在上面提到的main.py中,只需要使用:

  client = myMqtt()
  client.run() # <--- run() has `loop_forever()`

^{} code from paho.mqtt

对于gpsdb,我正在考虑为我的应用程序创建类似的包装器

对于n2k,我有一个简单的包装器,它在/dev/ttyUSB0端口上调用预安装的二进制文件(actisense-serialanalyzer),如下所示:

class N2KParser:
    def __init__(self, parser=None):
        self.cfg = get_configuration('n2k') # this has all the configuration (/dev/ttyUSB0)

        self.actisense_process = Popen(['actisense-serial', '-r', self.cfg['port']], stdout=PIPE)
        self.analyzer_proc = Popen(['analyzer', '-json'], stdin=self.actisense_process.stdout,
                                    stdout=PIPE, stderr=PIPE)
    def read(self):
        logger.info('Reading NMEA2000 via Actisense-NGT1 Gateway')
        while self.analyzer_proc.poll() is None:
            n2k_data = self.analyzer_proc.stdout.readline().decode('utf-8')
            try:
                n2k_dict = json.loads(n2k_data)
            except Exception as e:
                raise e
                self.actisense_process.close()
                self.analyzer_proc.close()

    def close():
        logger.info('Closing the Parser for N2K')
        self.actisense_process.close()
        self.analyzer_proc.close()

在类似于mqttmain.py文件中,我在client.run()之后添加了一个n2k的实例

from myApp.cloud.Client import CloudMqtt
from myApp.db.dbClient import influxInstance
from myApp.nmea2k.n2kClient import N2KParser
import time

if __name__ == '__main__':

    try:
        db = influxInstance()
    except Exception as e:
        raise e
    try:
        client = CloudMqtt()
        rc = client.run()
    except Exception as e:
        raise e
        db.close()
        client.loop_stop()
        client.disconnect()
    try:
        parser = N2KParser() # <--- this is not executed since blocked by `client.run()`
    except Exception as e:
        raise e
        parser.close()

如评论所示:

  • 由于run()方法使用的loop_forever()使用线程,并且被永远阻止,因此N2KParser不会被创建和调用

在我的myApp中构造这些子模块的调用的最佳方式是什么,这样它们就可以并行运行而不会互相阻塞

附言

因为我对所有子模块都使用logging:这里是stdout上的输出

INFO:myApp.db.dbClient:Client instance to DB created INFO:myApp.cloud.Client:MQTT client instance created DEBUG:myApp.cloud.Client:Sending CONNECT (u1, p1, wr0, wq0, wf0, c0, k60) client_id=b'f628a6d8-188a-403e-90ef-da72ec1474b6'DEBUG:umg.cloud.Client:Received CONNACK (1, 0) DEBUG:myApp.cloud.Client:RC: 0 INFO:myApp.cloud.Client:connected to cloud DEBUG:myApp.cloud.Client:Sending PINGREQ DEBUG:myApp.cloud.Client:Received PINGRESP


Tags: rundebugselfclientcloudclosedbstdout