如何使用python快速将消息发送到azure队列存储?

2024-09-29 19:16:33 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用python azure.storage.queue库向azure发送大量消息(数千万条),但这需要很长时间。我使用的代码如下:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString, queueName)
for message in messages:
    queueClient.send_message(message)

目前,发送大约70000条消息需要3个小时的时间,考虑到可能需要发送的消息数量,发送速度明显太慢

我查阅了文档,试图找到一个批处理选项,但似乎没有:https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

我还想知道是否有人有使用asynchio库来加速这个过程的经验,并能建议如何使用它


Tags: from消息messagestringqueueexamplestorageconnection
1条回答
网友
1楼 · 发布于 2024-09-29 19:16:33

试试这个:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString, queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result', future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread, messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread, messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent {toc - tic:0.4f} seconds")

如您所见,我将消息数组拆分为两部分,并使用两个任务并发地将数据推送到队列中。根据我的测试,我有大约800条消息,推送所有消息需要花费94秒: enter image description here

但按照上面的方法,我花了48秒:

enter image description here

相关问题 更多 >

    热门问题