MQTT&postgresqldb:使用python将MQTT消息插入Postgresql表中的列中

2024-06-28 11:17:09 发布

您现在位置:Python中文网/ 问答频道 /正文

我是PostgreSQL新手,我想使用pysycopg2将mqtt消息插入PostgreSQL数据库。不幸的是,它并没有像预期的那样工作。我认为这是一个简单的错误,但我不知道错误到底是什么。首先,我使用python脚本[1]在mosquitto broker中发布了mqtt消息,然后从另一个脚本[2]订阅并尝试存储到postgresql中。相应的错误消息如[3]所示

以下是我的发布者脚本,用于将伪mqtt json数据发布到mosquitto broker:

#!/usr/bin/python

import paho.mqtt.client as mqtt
import numpy as np
import time

broker_address = "localhost"

def on_connect(client, userdata, flags, rc):
  print("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()

while True:
  time.sleep(0.05)
  degrees = np.random.random_sample()
  toa = np.random.random_sample()
  humidity = np.random.random_sample()
  json =  ('''[{"time": "2020-04-01 21:00:00",  "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]''')

  locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'

  locpk= str(locpk)
  json= str(json)
  client.publish("device14/geo", locpk, 1, 1)
  client.publish("device14/geo", json, 1, 1)

以下是我的订阅服务器脚本,用于订阅已发布的消息并插入PostgreSQL:

#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("device14/geo",0)

def on_message(client, userdata, msg):
    Date = datetime.datetime.utcnow()
    message= msg.payload.decode()
    try:
            #print the JSON Message with Data and Topic
            print(str(Date) + ": " + msg.topic + " " + str(message))
            #concatenate the SQL string
            sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
            #execute the INSERT statement
            cur = conn.cursor()
            cur.execute(sql_string)
            #commit the changes to the database
            conn.commit()
            print("Finished writing to PostgreSQL")
    except (Exception, Error) as err:
            print("\npsycopg2 connect error:", err)
            #print("Could not insert " + message + " into Postgresql DB")

#Set up a client for Postgresql DB
try:
    #read connection parameters
    params = config()
    #connect to the PostgreSQL server
    print('Connecting to the PostgreSQL database...')
    conn = psycopg2.connect(**params)
    #create a cursor
    cur = conn.cursor()
    #execute a statement
    print('PostgreSQL database version:')
    cur.execute('SELECT version()')
    cur.execute(sql)
    #display the PostgreSQL database server version
    db_version = cur.fetchone()

    print(db_version)

except (Exception, psycopg2.DatabaseError) as error:
    print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()

#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)

client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()

错误:

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py


Connecting to the PostgreSQL database...

PostgreSQL database version:

no results to fetch

Connected with result code 0

Received a message on topic: device14/geo


2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00",   "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]


psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
               ^**


Received a message on topic: device14/geo

2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}

psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...

期待您的评论。任何帮助都将不胜感激

PS:我也尝试过改变发布消息的结构(如locpk、json),但没有帮助。如果您对发布的消息结构有任何建议,请告诉我。我试试看


Tags: thetoimportclientmessagetimeonpostgresql
1条回答
网友
1楼 · 发布于 2024-06-28 11:17:09

我没有看到table_name的表结构,但是如果它只有一列(column_name),并且您想在其中存储JSON文档,那么您需要在PostgreSQL中将其定义为jsonb
在这样的列中插入数据很容易:

from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...

但是,对整个消息使用单个列不是一个好的设计选择。
为公共键(如time, device_addr, latitude, longitude, altitude和其他键)创建列(我只是根据提供的数据进行猜测)。
将不太重要(可能丢失)的键存储在单独的jsonb列中(例如称为data

相关问题 更多 >