在我的本地版本中,我用python创建了一个时间触发器函数,每5分钟从blob存储中读取一个CSV文件,并在处理后将其保存到另一个存储中
在当地一切都很好。我将“部署到功能应用程序…”从VS代码部署到Azure,但不起作用
我想知道如何将此代码传输到Azure函数
import datetime
import logging
import os
import json
import uuid
import pandas as pd
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
import azure.functions as func
def main(mytimer: func.TimerRequest) -> None:
utc_timestamp = datetime.datetime.utcnow().replace(
tzinfo=datetime.timezone.utc).isoformat()
if mytimer.past_due:
logging.info('The timer is past due!')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
def clear_time(input_string):
input_string = input_string.split(" ")
date, time = input_string[0], input_string[1]
time = time.split("-")[0].split(".")[0]
return f"{date} {time}"
def clean_iloc(input_line):
temp = {}
temp_array = []
body = json.loads(input_line["body"])
element_id = input_line["serial_id"]
MSG_TYPE_TAG = body["MSG_TYPE_TAG"]
# temp["serial_id"]=element_id
# temp["message_type"]=MSG_TYPE_TAG
temp_array.append(element_id)
temp_array.append(MSG_TYPE_TAG)
if body["error"] != {}:
print(body["error"])
if MSG_TYPE_TAG == "300":
time = clear_time(
body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["date_time"])
# temp["data_time"]=time
temp_array.append(time)
acceleration_array = body["GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG"]["acceleration_array"]
for i in range(100):
# temp[f"acceleration_element_{i}"]=acceleration_array[i]
temp_array.append(acceleration_array[i])
else:
time = clear_time(
body["GET_RIGIDSENSE_DEVICE_SHORTSTATUS_LOG_TAG"]["date_time"])
# temp["data_time"]=time
temp_array.append(time)
for i in range(100):
# temp[f"acceleration_element_{i}"]=None
temp_array.append(None)
return temp_array
try:
# Source Data Storage Variable
urlblob = "******"
# RAW Data Blob Storage Connection String
raw_connect_str = "******"
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(
raw_connect_str)
# Create a unique name for the container
container_name = str(uuid.uuid4())
# Create the container
container_client = blob_service_client.create_container(container_name)
print("reading csv file...")
df = pd.read_csv(urlblob)
print("file read :D ")
dataframe = pd.DataFrame({})
dataframe["serial_id"] = []
dataframe["message_type"] = []
dataframe["data_time"] = []
for i in range(100):
dataframe[f"acceleration_element_{i}"] = []
for i in range(df.shape[0]):
dataframe.loc[i] = clean_iloc(df.iloc[i])
# Create a blob client using the local file name as the name for the blob
dataframe.to_csv("dataframe.csv")
blob_client = blob_service_client.get_blob_client(
container=container_name, blob='dataframe.csv')
print("\nUploading to Azure Storage as blob:\t" + 'dataframe.csv')
# Upload the created file
with open("dataframe.csv", "rb") as data:
blob_client.upload_blob(data)
except Exception as ex:
print('Exception:')
print(ex)
请使用临时路径保存文件,您可以导入tempfile并在函数中添加以下代码:
最后的
Azure function
如下所示:相关问题 更多 >
编程相关推荐