塔曲任务队列系统
taqu的Python项目详细描述
基于Azure服务总线队列和pydantic模型构建的Python任务队列系统。在
这是什么?在
许多系统都可以从后台任务队列中获益,这些任务独立于API或其他进程运行。这有助于提高API性能,限制流量峰值的影响,以及通过并行处理各种事情来扩展。在
Python的实际标准是celeri+RabbitMQ,但是托管RabbitMQ是另一个负担,并不总是最有趣的体验,而且celeri还不支持asyncio。完全托管的解决方案(如Azure服务总线)可以帮助您更快地起步,无需担心的事情更少,还可以让您节省成本。在
主要用于asyncio(来自taqu.aio模块),但也可以用于非异步代码(使用taqu模块的导入)。在
支持您可能需要的所有基本功能:
- Fast insertion of tasks to queue
- Async task processing
- Easy to scale workers
- Retry logic - if there’s an uncaught exception the task will automatically be put back in the queue
- Clean shutdown on Ctrl+C (waits until tasks finish processing)
入门
在Azure Portal中设置一个新的服务总线(任何层都可以),然后在其中设置一个队列。您可能希望启用分区,也可能需要启用死记硬背。然后您需要获取代码的访问凭据。确保已安装Azure CLI,然后运行:
az login # Ensure you're logged in to Azure az account list # List subscriptions az account set --subscription <subscriptionId> # Set active subscription az servicebus namespace authorization-rule keys list \ --resource-group <rgName> \ --namespace-name <namespace> \ --name RootManageSharedAccessKey \ --query primaryConnectionString \ --output tsv
你还需要使用安装了tall的Azure库:
^{pr2}$然后,您可以设置您的worker,下面是一个示例worker.py,您可以使用python worker.py运行它:
importasynciofromtaqu.aioimportTaquAzureWorkerfrompydanticimportBaseModelCONNECTION_STRING="..."worker=TaquAzureWorker(CONNECTION_STRING)classCreateUser(BaseModel):username:strasyncdefcreate_user(user:CreateUser):print(user.username)worker.register(create_user)worker.run()
在worker就位后,您可以创建一个客户端并发送一些任务
fromtaquimportTaquAzureClientfrompydanticimportBaseModelCONNECTION_STRING="..."taqu=TaquAzureClient(CONNECTION_STRING)classCreateUser(BaseModel):username:strtaqu.add_task(CreateUser(username="my_new_username"))
您也可以查看examples。在
贡献
GitHub在这里跟踪这个项目的请求。如果你想投稿,请随意在这里submit issues(包括功能请求)或pr。在
在本地测试更改python setup.py develop是一种很好的运行方法,之后可以python setup.py develop --uninstall(您可能还需要使用--user标志)。在
- 项目
标签: