异步可靠队列(基于redis)

arque的Python项目详细描述


阿尔克

异步可靠队列(基于redis)

灵感来自tomdewire的文章“Redis中的可靠排队(第1部分)”[1][2]和“torrelque”python模块[3]。在

特点:

- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments

安装:

^{pr2}$

用法:

importsignalimportrandomimportloggingimportasyncioimportaioredisimporttimefromfunctoolsimportwrapsfromarqueimportArquelogger=logging.getLogger(__name__)asyncdefshutdown(signal,loop):"""Cleanup tasks tied to the service's shutdown."""logging.info(f"Received exit signal {signal.name}...")tasks=[tfortinasyncio.all_tasks()iftisnotasyncio.current_task()][task.cancel()fortaskintasks]logging.info(f"Cancelling {len(tasks)}outstanding tasks")awaitasyncio.gather(*tasks)logging.info(f"Flushing metrics")loop.stop()defaioredis_pool(host='redis://localhost',encoding='utf8'):defwrapper(func):@wraps(func)asyncdefwrapped():redis=awaitaioredis.create_redis_pool(host,encoding=encoding)try:returnawaitfunc(redis=redis)finally:redis.close()awaitredis.wait_closed()returnwrappedreturnwrapper@aioredis_pool(host='redis://localhost',encoding='utf8')asyncdefproduce_task(redis=None):logger.info('Starting producing...')queue=Arque(redis=redis)whileTrue:for_inrange(1):task={'value':random.randint(0,99)}task_id=f"custom_{task['value']}_{time.time()}"logger.debug('Produced task %s',task)awaitqueue.enqueue(task,task_id=task_id,task_timeout=10,delay=1)awaitasyncio.sleep(1)asyncdefprocess(task_data):logger.debug('Consumed task %s',task_data)awaitasyncio.sleep(1)@aioredis_pool(host='redis://localhost',encoding='utf8')asyncdefconsume_task(redis=None):logger.info('Starting consuming...')queue=Arque(redis=redis,working_limit=3)whileTrue:task_id,task_data=awaitqueue.dequeue()iftask_id=='__not_found__':continueiftask_id=='__overloaded__':print(f'TASK ID: {task_id}')awaitasyncio.sleep(1)continueiftask_id=='__marked_as_failed___':print(f'FAILED  ID: {task_id}')continuetry:awaitprocess(task_data)awaitqueue.release(task_id)exceptException:logger.exception('Job processing has failed')awaitqueue.requeue(task_id,delay=5)stats=awaitqueue.get_stats()logger.info(stats)@aioredis_pool(host='redis://localhost',encoding='utf8')asyncdefsweep_task(redis=None):logger.info('Starting sweeping...')queue=Arque(redis=redis,sweep_interval=5)awaitqueue.schedule_sweep()@aioredis_pool(host='redis://localhost',encoding='utf8')asyncdefstats_task(redis=None):logger.info('Starting stats...')queue=Arque(redis=redis)whileTrue:stats=awaitqueue.get_stats()logger.info(stats)awaitasyncio.sleep(5)asyncdefexample():tasks=[]for_inrange(5):tasks.append(consume_task())tasks.append(produce_task())tasks.append(sweep_task())tasks.append(stats_task())awaitasyncio.gather(*tasks)if__name__=='__main__':logging.basicConfig(level=logging.DEBUG,format='%(asctime)s%(message)s')loop=asyncio.get_event_loop()signals=(signal.SIGHUP,signal.SIGTERM,signal.SIGINT,signal.SIGUSR1)forsinsignals:loop.add_signal_handler(s,lambdas=s:asyncio.create_task(shutdown(s,loop)))try:loop.run_until_complete(example())finally:loop.close()logging.info("Successfully shutdown...")

参考

[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
当JUnit测试失败时,java会显示一条用户友好的消息   java着色器不渲染到窗口   java CXF DOSGi正在忽略注释   我想把这行JAVA改成kotlin   java如果我想在ArrayQue中提供null,我该怎么做   java如何在Springboot中为线程调度命名   java如何将使用者<String>传递给方法   java如何在GWT中检查正则表达式的时间戳?   将Mongodb与Java连接时,在类路径上找不到SLF4J   使用Java创建数组数组   java Hibernate获取对象并另存为外键   java应用程序在打开活动时崩溃   java如何从AlarmManager BroadcastReceiver调用MainActivity方法?   Executor服务中的java不确定任务   JavaSpringMVC本地主机工作应用程序在Heroku部署后返回错误404   当精灵与加速计一起旋转时,java将精灵弹离墙壁   Java:可以创建一个带有“变量”字段的类吗?   java OpenGL空指针异常   使用servlet登录facebook时出现java错误   Android Studio中的java Junit