如何将从Azure服务总线接收的批处理字符串消息逐行拆分?

2024-10-03 19:21:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我是python的初学者,我有一个带有时间触发器的Azure函数。此函数使用字符串格式从Azure服务总线读取一批原始JSON数据

这是两行数据。事实上,我收到了大约50条这样的信息是连续的。现在我想逐行拆分此邮件,然后将其归档到Azure存储

该消息类似于以下示例(第1行和第2行的concat):

{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.0254566,\"10\":-0.054562772}},\"NUMBER_TAG\":\"2145600\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":1,"id":"074222a38-2816-42c7-b95c-6644448ba9d","t":-2}

第1行是:

{"Name":"","Seri":21000000,"SiName":"","As":"","PId":21070101,"ICheck":0,"SeeNum":405097041391424,"Type":0,"Counter":33,"PaId":0,"MeType":30,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTACHED_DEVICE_SERIAL_NUMBER_TAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-1}

第2行是:

{"Name":"","Seri":4560000,"SiName":"","As":"","PId":2107401,"ICheck":0,"SeeNum":40509704561424,"Type":0,"Counter":34,"PaId":0,"MeType":31,"RecTime":"2021-10-21T09:04:41.0151Z","ReaTime":null,"Cape":"2021-10-21T09:04:40.644","Status":0,"text":"{\"TYPE_TAG\":\"00\",\"ENSORAG\":{\"date_time\":\"2021-10-21 09:04:40.644\",\"seber\":10,\"seqmber\":405097041391424,\"lo_name\":\"\",\"accati\":{\"0\":0.0,\"1\":-0.037665367,\"2\":-0.033863068,\"3\":-0.026795387,\"4\":-0.03757,\"5\":-0.02809906,\"6\":-0.016090393,\"7\":-0.040496826,\"8\":-0.05318451,\"9\":-0.025012016,\"10\":-0.057872772}},\"ATTTAG\":\"21000000\",\"error\":{}}","CerId":null,"Id":null,"Asse":null,"Id":0,"id":"075f0a38-2816-42c7-b95c-66c425b8ba9d","t":-2}

行的结构如下图所示:

enter image description here

在我看来,首先我应该拆分每一行,然后创建一个数据框,并在相关列中插入每个值。之后,我附加到一个blob。是这样吗

我该怎么办?你建议的解决方案是什么

编辑: 从服务总线读取的我的代码:

from azure.servicebus import ServiceBusClient, ServiceBusMessage

connection_str = "**"
topic_name = "***"
subscription_name = "***"

servicebus_client = ServiceBusClient.from_connection_string(
    conn_str=connection_str, logging_enable=True)

with servicebus_client:
    # get the Subscription Receiver object for the subscription
    receiver = servicebus_client.get_subscription_receiver(
        topic_name=topic_name, subscription_name=subscription_name, )
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            # complete the message so that the message is removed from the subscription
            receiver.complete_message(msg)

Tags: nameidastagtypecounterpidnull
2条回答

由于消息是单独发送的,因此您可以单独处理它们。不需要连接成字符串。只需将它们添加到数据帧中即可。下面的示例适用于队列,但您可以扩展到主题/订阅。我还附上了结果,向您展示了输出的样子

from azure.servicebus import ServiceBusClient
import pandas as pd
import json
from pandas import json_normalize

CONNECTION_STR = 'Endpoint=sb://xxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
QUEUE_NAME = 'xxxxxxxxxxx'

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

with servicebus_client:
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
    
    # create an Empty DataFrame object
    df = pd.DataFrame()
    msg_concat = ""
    dfs = []
    with receiver:
        received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
        for msg in received_msgs:
            msg_dict = json.loads(str(msg))
            df2 = json_normalize(msg_dict)
            df = df.append(df2, ignore_index = True)
            receiver.complete_message(msg)
print(df)
print("Receive is done.")


  Name      Seri SiName As  ...  Id  Asse                                    id  t
0       21000000            ...   0  None  075f0a38-2816-42c7-b95c-66c425b8ba9d -1
1        4560000            ...   0  None  075f0a38-2816-42c7-b95c-66c425b8ba9d -2

[2 rows x 21 columns]
Receive is done.

考虑三行的样本数据:

data = '{"Name": "Hassan", "code":"12"}{"Name": "Jack", "code":"345"}{"Name": "Jack", "code":"345"}'

以下是如何从这些数据中获取数据帧:

from ast import literal_eval
data = [literal_eval(d + '}')for d in data.split('}')[0:-1]]
df = pd.DataFrame.from_records(data)

Output:

     Name code
0  Hassan   12
1    Jack  345
2    Jack  345

相关问题 更多 >