如何将从RabbitMQ队列接收的JSON转换为CSV?

2024-09-29 23:26:51 发布

您现在位置:Python中文网/ 问答频道 /正文

目前,我正在使用组织内RabbitMQ队列中的消息。每天我都需要将收到的所有消息推送到一个csv,该csv最终将作为一个表放到数据仓库中

代码总是在侦听队列,理想情况下,我希望将数据流传输到csv

#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
    print(body)

while True:
    try:
        #connect
        credentials = pika.PlainCredentials(username, password)
        connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))


channel = connection.channel()
channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)
        channel.start_consuming()

我开始使用队列后收到的输出如下:这是收到的一行数据。它基本上返回一个json对象,但是在使用json对象时需要删除b'{“metrics”

b'{"metrics":[{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"deviceHealthScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"configAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"imageAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"1.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"vulnerabilityAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099325,"unit":"count","value":"10.0"},{"ci_id":"SPN-EQSHATA1","client_id":"39956e6fdb256757567567433333193a","name":"overallAssuranceScore","source_id":"Global","source_management_platform":"XXX","timestamp":1582886099642,"unit":"count","value":"5.5"}],"emr_published_on":1582886099642}'


Tags: nameclientciidsourcecountchannelunit
1条回答
网友
1楼 · 发布于 2024-09-29 23:26:51

b'...'只意味着您得到了一个json模块可以愉快地处理的字节字符串。您将得到一个dictionnary,它对于metrics键具有一个字典列表的值。该列表可以直接提供数据帧

这意味着您可以简单地处理该问题:

df = pd.DataFrame(json.loads(body)['metrics'])

相关问题 更多 >

    热门问题