如何将脚本分解为更小的函数并创建main.py?

2024-10-01 04:58:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我用python编写了这个脚本,如果按原样执行的话,它可以非常好地工作。我要做的是将这个脚本分解成有意义的函数,并创建main.py以作为一个合适的python应用程序来执行

下面是我的LiveStream.py代码,我用它在每分钟开始时从传感器收集数据,并将其发送到MySQL数据库,同时将其发布到URL。如前所述,如果我执行:python3livestream.py,这将非常好地工作

# Import Dependencies
import board
import pandas as pd
from busio import I2C
import adafruit_bme680
from datetime import datetime, timedelta
import time
import requests
import mysql.connector
import json
import sqlalchemy

# read database config file
with open("config.json") as config:
    param = json.load(config)

# Create library object using Bus I2C port
i2c = I2C(board.SCL, board.SDA)
bme680 = adafruit_bme680.Adafruit_BME680_I2C(i2c, debug=False)

# change this to match the location's pressure (hPa) at sea level
bme680.sea_level_pressure = 1013.25

# Read data from sensors
while True:
    # Create the now variable to capture the current moment
    TimeStamp = datetime.now()
    Temperature = round((bme680.temperature * 9/5) + 32, 2)
    Gas = round(bme680.gas, 2)
    Humidity = round(bme680.humidity, 2)
    Pressure = round(bme680.pressure, 2)
    Altitude = round(bme680.altitude, 2)

    now = datetime.strftime(TimeStamp,"%Y-%m-%dT%H:%M:%S")
    # Adding collected measurements into dataframe
    data = pd.DataFrame([
        {
            "TimeStamp": now,
            "Temperature": Temperature,
            "Gas": Gas,
            "Humidity": Humidity,
            "Pressure": Pressure,
            "Altitude": Altitude
        }
    ])

    # Try establishing connection with database
    try:
        engine = sqlalchemy.create_engine('mysql+mysqlconnector://{0}:{1}@{2}/{3}'.
                                          format(param['MyDemoServer'][0]['user'],
                                                 param['MyDemoServer'][0]['password'],
                                                 param['MyDemoServer'][0]['host'],
                                                 param['MyDemoServer'][0]['database']), echo=False)
        # Cleaning the data from existing tables MetricValues and Metrics
        db_con = engine.connect()
        if db_con.connect():
            try:
                data.to_sql('sensordata', con = db_con, if_exists = 'append', index = False)
                db_con.close()
                # Dispose the engine
                engine.dispose()
            except OSError as e:
                print(e)
    except OSError as e:
        print(e)

    # Power BI API
    # BI Address to push the data to
    url = 'https://api.powerbi.com/beta/94cd2fa9-eb6a-490b-af36-53bf7f5ef485/datasets/2a7a2529-dbfd-4c32-9513-7d5857b61137/rows?noSignUpCheck=1&key=nS3bP1Mo4qN9%2Fp6XJcTBgHBUV%2FcOZb0edYrK%2BtVWDg6iWwzRtY16HWUGSqB9YsqF3GHMNO2fe3r5ltB7NhVIvw%3D%3D'

    # post/push data to the streaming API
    headers = {
        "Content-Type": "application/json"
    }
    response = requests.request(
        method="POST",
        url=url,
        headers=headers,
        data=json.dumps(data.to_json())
    )
    data = pd.DataFrame()
    # Re-run the script at the beginning of every new minute.
    dt = datetime.now() + timedelta(minutes=1)
    dt = dt.replace(second=1)

    while datetime.now() < dt:
        time.sleep(1)

这是我到目前为止尝试过的。。。我创建了一个lib文件夹,其中包含etl.py文件。在这个文件中,我尝试创建了如下函数:

def sensorsreading():
# Create library object using Bus I2C port
i2c = I2C(board.SCL, board.SDA)
bme680 = adafruit_bme680.Adafruit_BME680_I2C(i2c, debug=False)

# change this to match the location's pressure (hPa) at sea level
bme680.sea_level_pressure = 1013.25

# Read data from sensors
while True:
    # Create the now variable to capture the current moment
    TimeStamp = datetime.now()
    Temperature = round((bme680.temperature * 9 / 5) + 32, 2)
    Gas = round(bme680.gas, 2)
    Humidity = round(bme680.humidity, 2)
    Pressure = round(bme680.pressure, 2)
    Altitude = round(bme680.altitude, 2)

    now = datetime.strftime(TimeStamp, "%Y-%m-%dT%H:%M:%S")
    # Adding collected measurements into dataframe
    data = pd.DataFrame([
        {
            "TimeStamp": now,
            "Temperature": Temperature,
            "Gas": Gas,
            "Humidity": Humidity,
            "Pressure": Pressure,
            "Altitude": Altitude
        }
    ])
    return data

并具有以下功能:

def dataload(data):
# Try establishing connection with database
try:
    engine = sqlalchemy.create_engine('mysql+mysqlconnector://{0}:{1}@{2}/{3}'.
                                      format(param['MyDemoServer'][0]['user'],
                                             param['MyDemoServer'][0]['password'],
                                             param['MyDemoServer'][0]['host'],
                                             param['MyDemoServer'][0]['database']), echo=False)
    # Cleaning the data from existing tables MetricValues and Metrics
    db_con = engine.connect()
    if db_con.connect():
        try:
            data.to_sql('sensordata', con=db_con, if_exists='append', index=False)
            db_con.close()
            # Dispose the engine
            engine.dispose()
        except OSError as e:
            print(e)
except OSError as e:
    print(e)

我的main.py如下所示:

import pandas as pd
from datetime import datetime, timedelta
import time
from lib.etl import *

def etl(name):
    data = sensorsreading()
    dataload(data)


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    etl('PyCharm')
    # Re-run the script at the beginning of every new minute.
    dt = datetime.now() + timedelta(minutes=1)
    dt = dt.replace(second=1)

    while datetime.now() < dt:
        time.sleep(1)

运行main.py时,似乎没有将数据帧从sensorsreading()传递到dataload()函数

知道我做错了什么吗


Tags: thetofromimportdbdatadatetimeparam
1条回答
网友
1楼 · 发布于 2024-10-01 04:58:55

为了回答您最初的问题,您使用的是收益率而不是回报率。生成器中使用了收益率,您可以在此处阅读更多内容:https://www.geeksforgeeks.org/use-yield-keyword-instead-return-keyword-python/

在不需要精确执行的情况下,这将每60秒调用一次函数。无论如何,我建议使用systemctl或cron之类的调度器

import time

while True:
    etl('PyCharm')
    time.sleep(60)

如果您想要更精确的信息,可以使用:

import time
starttime = time.time()
while True:
    etl('PyCharm')
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))

What is the best way to repeatedly execute a function every x seconds?所述

相关问题 更多 >