如何将python程序转换为Azure函数?

2024-09-24 04:19:16 发布

您现在位置:Python中文网/ 问答频道 /正文

在我的本地版本中,我用python创建了一个时间触发器函数,每5分钟从blob存储中读取一个CSV文件,并在处理后将其保存到另一个存储中

在当地一切都很好。我将“部署到功能应用程序…”从VS代码部署到Azure,但不起作用

我想知道如何将此代码传输到Azure函数

 import datetime
 import logging
 import os
 import json
 import uuid
 import pandas as pd
 from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
    
    
 import azure.functions as func
    
    
 def main(mytimer: func.TimerRequest) -> None:
     utc_timestamp = datetime.datetime.utcnow().replace(
         tzinfo=datetime.timezone.utc).isoformat()
    
     if mytimer.past_due:
         logging.info('The timer is past due!')
    
     logging.info('Python timer trigger function ran at %s', utc_timestamp)
    
     def clear_time(input_string):
         input_string = input_string.split(" ")
         date, time = input_string[0], input_string[1]
         time = time.split("-")[0].split(".")[0]
         return f"{date} {time}"
    
     def clean_iloc(input_line):
         temp = {}
         temp_array = []
         body = json.loads(input_line["body"])
         element_id = input_line["serial_id"]
         MSG_TYPE_TAG = body["MSG_TYPE_TAG"]
    
         # temp["serial_id"]=element_id
         # temp["message_type"]=MSG_TYPE_TAG
         temp_array.append(element_id)
         temp_array.append(MSG_TYPE_TAG)
         if body["error"] != {}:
             print(body["error"])
    
         if MSG_TYPE_TAG == "300":
             time = clear_time(
                 body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["date_time"])
             # temp["data_time"]=time
             temp_array.append(time)
             acceleration_array = body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["acceleration_array"]
             for i in range(100):
                 # temp[f"acceleration_element_{i}"]=acceleration_array[i]
                 temp_array.append(acceleration_array[i])
    
         else:
    
             time = clear_time(
                 body["GET_RIGIDSENSE_DEVICE_SHORTSTATUS_LOG_TAG"]["date_time"])
             # temp["data_time"]=time
             temp_array.append(time)
             for i in range(100):
                 # temp[f"acceleration_element_{i}"]=None
                 temp_array.append(None)
         return temp_array
    
     try:
    
         # Source Data Storage Variable
         urlblob = "******"
    
         # RAW Data Blob Storage Connection String
         raw_connect_str = "******"
    
         # Create the BlobServiceClient object which will be used to create a container client
         blob_service_client = BlobServiceClient.from_connection_string(
             raw_connect_str)
    
         # Create a unique name for the container
         container_name = str(uuid.uuid4())
    
         # Create the container
         container_client = blob_service_client.create_container(container_name)
         print("reading csv file...")
         df = pd.read_csv(urlblob)
         print("file read :D ")
    
         dataframe = pd.DataFrame({})
         dataframe["serial_id"] = []
         dataframe["message_type"] = []
         dataframe["data_time"] = []
         for i in range(100):
             dataframe[f"acceleration_element_{i}"] = []
         for i in range(df.shape[0]):
             dataframe.loc[i] = clean_iloc(df.iloc[i])
            
         # Create a blob client using the local file name as the name for the blob
         dataframe.to_csv("dataframe.csv")
         blob_client = blob_service_client.get_blob_client(
             container=container_name, blob='dataframe.csv')
         print("\nUploading to Azure Storage as blob:\t" + 'dataframe.csv')
         # Upload the created file
         with open("dataframe.csv", "rb") as data:
             blob_client.upload_blob(data)
    
     except Exception as ex:
         print('Exception:')
         print(ex)

Tags: csvtheimportclientdataframeinputtimecontainer
1条回答
网友
1楼 · 发布于 2024-09-24 04:19:16

请使用临时路径保存文件,您可以导入tempfile并在函数中添加以下代码:

        # Create a blob client using the local file name as the name for the blob
        temp_path = tempfile.gettempdir()
        file_path = os.path.join(temp_path, 'dataframe.csv')
        dataframe.to_csv(file_path)

最后的Azure function如下所示:

import datetime
import logging
import os
import json
import uuid
import pandas as pd
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
import azure.functions as func
import tempfile


def main(mytimer: func.TimerRequest) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()

    if mytimer.past_due:
        logging.info('The timer is past due!')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    def clear_time(input_string):
        input_string = input_string.split(" ")
        date, time = input_string[0], input_string[1]
        time = time.split("-")[0].split(".")[0]
        return f"{date} {time}"

    def clean_iloc(input_line):
        temp = {}
        temp_array = []
        body = json.loads(input_line["body"])
        element_id = input_line["serial_id"]
        MSG_TYPE_TAG = body["MSG_TYPE_TAG"]

        # temp["serial_id"]=element_id
        # temp["message_type"]=MSG_TYPE_TAG
        temp_array.append(element_id)
        temp_array.append(MSG_TYPE_TAG)
        if body["error"] != {}:
            print(body["error"])

        if MSG_TYPE_TAG == "300":
            time = clear_time(
                body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["date_time"])
            # temp["data_time"]=time
            temp_array.append(time)
            acceleration_array = body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["acceleration_array"]
            for i in range(100):
                # temp[f"acceleration_element_{i}"]=acceleration_array[i]
                temp_array.append(acceleration_array[i])

        else:

            time = clear_time(
                body["GET_RIGIDSENSE_DEVICE_SHORTSTATUS_LOG_TAG"]["date_time"])
            # temp["data_time"]=time
            temp_array.append(time)
            for i in range(100):
                # temp[f"acceleration_element_{i}"]=None
                temp_array.append(None)
        return temp_array

    try:

        # Source Data Storage Variable
        urlblob = ""

        # RAW Data Blob Storage Connection String
        raw_connect_str = ""

        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(
            raw_connect_str)

        # Create a unique name for the container
        container_name = str(uuid.uuid4())

        # Create the container
        container_client = blob_service_client.create_container(container_name)
        print("reading csv file...")
        df = pd.read_csv(urlblob)
        print("file read :D ")

        dataframe = pd.DataFrame({})
        dataframe["serial_id"] = []
        dataframe["message_type"] = []
        dataframe["data_time"] = []
        for i in range(100):
            dataframe[f"acceleration_element_{i}"] = []
        for i in range(df.shape[0]):
            dataframe.loc[i] = clean_iloc(df.iloc[i])
        
        # Create a blob client using the local file name as the name for the blob
        temp_path = tempfile.gettempdir()
        file_path = os.path.join(temp_path, 'dataframe.csv')
        dataframe.to_csv(file_path)
        blob_client = blob_service_client.get_blob_client(
            container=container_name, blob='dataframe.csv')
        print("\nUploading to Azure Storage as blob:\t" + 'dataframe.csv')
        # Upload the created file
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data)

    except Exception as ex:
        print('Exception:')
        print(ex)

相关问题 更多 >