使用mysql的高可用异步队列(锁)
jasyncq的Python项目详细描述
贾辛克
使用mysql的异步任务队列
如何使用
1。创建aimySQL连接池
importasyncioimportloggingimportaiomysqlloop=asyncio.get_event_loop()pool=awaitaiomysql.create_pool(host='127.0.0.1',port=3306,user='root',db='test',loop=loop,autocommit=False,)
2。使用初始化并将存储库注入dispatcher生成主题(表)
^{pr2}$3。享受排队
- 发布任务
awaitdispatcher.apply_tasks(tasks=[...dicttypetasks...],queue_name='QUEUE_TEST',)
- 消耗任务
scheduled_tasks=awaitdispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST',limit=10)pending_tasks=awaitdispatcher.fetch_pending_tasks(queue_name='QUEUE_TEST',limit=10,check_term_seconds=60,)tasks=[*pending_tasks,*scheduled_tasks]# ...RUN JOBS WITH tasks
4。完成任务
task_ids=[str(task.uuid)fortaskintasks]awaitdispatcher.complete_tasks(task_ids=task_ids)
示例
- 消费者:jasyncq/示例/消费者.py在
- 制作人:jasyncq/示例/producer.py在
你应该知道
- 调度程序的
fetch_scheduled_tasks
和fetch_pending_tasks
方法接受调度作业,并在同一事务中并发地将其状态更新为WORK IN PROGRESS
- jasyncq中排队的大多数任务都将在
exactly once
中运行fetch_scheduled_tasks
,但在某些情况下,由于工作人员在工作时关闭而导致作业消失。它可以通过fetch_pending_tasks
(它可以检查worker容忍WIP
-ed而不是{}(已删除行))恢复
- 项目
标签: