如何从avro文件转换为json文件,该文件最初是通过Azure事件中心通过邮递员以原始json的形式发送的?

2024-09-30 10:36:10 发布

您现在位置:Python中文网/ 问答频道 /正文

所以问题是,我最初通过邮递员将其作为原始数据发送:

原始数据作为JSON通过邮递员发送:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

一旦被事件中心捕获,它将转换为Avro文件。 我试图使用fastavro检索数据,并将其转换为JSON格式。 问题是,我没有得到最初由邮递员发送的原始数据。我找不到办法将它转换回原来的状态,为什么Avro也会从邮递员那里给我发送额外的信息? 我可能需要找到一种方法,将“Body”设置为仅转换。但出于某种原因,它也会在体内添加“字节” 我只是想找回通过邮递员发送的原始数据

init.py(Azure函数)

    import logging
    import os
    import string
    import json
    import uuid
    import avro.schema
    import tempfile
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from fastavro import reader, json_writer
    
    
    #Because the Apache Python avro package is written in pure Python, it is relatively slow, therefoer I make use of fastavro
    def avroToJson(avroFile):
        with open("json_file.json", "w") as json_file:
            with open(avroFile, "rb") as avro_file:
                avro_reader = reader(avro_file)
                json_writer(json_file, avro_reader.writer_schema, avro_reader)
    
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
      logging.info('Python HTTP trigger function processed a request.')
      print('Processor started using path ' + os.getcwd())
      connect_str = "###########"
      container = ContainerClient.from_connection_string(connect_str, container_name="####")
      blob_list = container.list_blobs() # List the blobs in the container.
      for blob in blob_list:
          # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).
          if blob.size > 508:
              print('Downloaded a non empty blob: ' + blob.name)
              # Create a blob client for the blob.
              blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
              # Construct a file name based on the blob name.
              cleanName = str.replace(blob.name, '/', '_')
              cleanName = os.getcwd() + '\\' + cleanName
              # Download file
              with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                  my_file.write(blob_client.download_blob().readall())# Write blob contents into the file.
                  
              avroToJson(cleanName)
              with open('json_file.json','r') as file:
                   jsonStr = file.read()
            
      return func.HttpResponse(jsonStr, status_code=200)

预期结果:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

实际结果:

{
   "SequenceNumber":19,
   "Offset":"10928",
   "EnqueuedTimeUtc":"4/1/2021 8:43:19 AM",
   "SystemProperties":{
      "x-opt-enqueued-time":{
         "long":1617266599145
      }
   },
   "Properties":{
      "Postman-Token":{
         "string":"37ff4cc6-9124-45e5-ba9d-######e"
      }
   },
   "Body":{
      "bytes":"{\r\n  \"id\": 1,\r\n  \"receiver\": \"2222222222222\",\r\n  \"message\": {\r\n    \"Name\": \"testing\",\r\n    \"PersonId\": 2,\r\n    \"CarId\": 2,\r\n    \"GUID\": \"1s3q1d-s546dq1-8e22e\",\r\n    \"LineId\": 2,\r\n    \"SvcId\": 2,\r\n    \"Lat\": -64.546547,\r\n    \"Lon\": -64.546547,\r\n    \"TimeStamp\": \"2021-03-18T08:29:36.758Z\",\r\n    \"Recorder\": \"dq65ds4qdezzer\",\r\n    \"Env\": \"DEV\"\r\n  },\r\n  \"operator\": 20404,\r\n  \"sender\": \"MSISDN\",\r\n  \"binary\": 1,\r\n  \"sent\": \"2021-03-29T08:29:36.758Z\"\r\n}"
   }
}

这个问题最初是在这个Alternative to Azure Event Hub Capture for sending Event Hub messages to Blob Storage?线程下发布的,因为在最初的问题中出现了另一个问题

如果这不是继续StackOverflow的方法,请随时评论我下次应该如何处理。亲切的问候


Tags: thenamefromimportjson邮递员原始数据container
2条回答

HTTP请求主体中的JSON文档进入事件中心的消息主体,该消息很快被写入捕获目的地,并带有一些额外的属性,如序列号、偏移量、排队时间、系统属性等

在反序列化时,读取器需要单独使用body对象,它应该是HTTP请求中的同一个body

请随意检查此页面中的事件集线器AVRO模式-https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview#use-avro-tools

请尝试返回正文:

return func.HttpResponse(json.loads(jsonStr)['body']['bytes'], status_code=200)

相关问题 更多 >

    热门问题