使用mysql的高可用异步队列(锁)

jasyncq的Python项目详细描述


贾辛克

PyPI version

使用mysql的异步任务队列

如何使用

1。创建aimySQL连接池

importasyncioimportloggingimportaiomysqlloop=asyncio.get_event_loop()pool=awaitaiomysql.create_pool(host='127.0.0.1',port=3306,user='root',db='test',loop=loop,autocommit=False,)

2。使用初始化并将存储库注入dispatcher生成主题(表)

^{pr2}$

3。享受排队

  • 发布任务
awaitdispatcher.apply_tasks(tasks=[...dicttypetasks...],queue_name='QUEUE_TEST',)
  • 消耗任务
scheduled_tasks=awaitdispatcher.fetch_scheduled_tasks(queue_name='QUEUE_TEST',limit=10)pending_tasks=awaitdispatcher.fetch_pending_tasks(queue_name='QUEUE_TEST',limit=10,check_term_seconds=60,)tasks=[*pending_tasks,*scheduled_tasks]# ...RUN JOBS WITH tasks

4。完成任务

task_ids=[str(task.uuid)fortaskintasks]awaitdispatcher.complete_tasks(task_ids=task_ids)

示例

  • 消费者:jasyncq/示例/消费者.py在
  • 制作人:jasyncq/示例/producer.py在

你应该知道

  • 调度程序的fetch_scheduled_tasksfetch_pending_tasks方法接受调度作业,并在同一事务中并发地将其状态更新为WORK IN PROGRESS
  • jasyncq中排队的大多数任务都将在exactly once中运行fetch_scheduled_tasks,但在某些情况下,由于工作人员在工作时关闭而导致作业消失。它可以通过fetch_pending_tasks(它可以检查worker容忍WIP-ed而不是{}(已删除行))恢复

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何通过标记名检索多个标记中的元素以进行selenium自动化测试   java iText如何查找最后一行被拆分到下一页   java如何在hudson中的特定jdk上运行Findbugs和PMD?   如何确保java程序与java Environment 6兼容?   对形状进行分组,这样我就可以通过鼠标点击和java处理循环浏览它们   使用生成器映射对象时,java定义无效   maven Java:Struts2和IntelliJ供初学者使用   java子类不继承父类字段   java Android Grid View在Android版本kitkat上崩溃   java Hibernate从缓存返回错误的列表,即使预期的列表与缓存的列表不同   java SendGrid:模板和替换标记   用于普通生产者| Kafka流的java自定义分区器   安卓理解Java内部类中的作用域   无法从Android Studio中的非静态方法调用java非静态方法   比较两个XML响应的JavaXMLUnit   java使用keytool列出密钥   不使用Java客户端库将视频上传到YouTube数据API v3   java My While循环即使在满足条件时也不会结束   自动在外部存储字符串数据,以便以后在Java中使用