一个django应用程序,可以从admin或cron运行新的后台任务,并从admin检查任务历史;基于django rq
django-task的Python项目详细描述
django任务
一个django应用程序,用于从admin或cron运行新的后台任务,并从admin检查任务历史记录;基于django rq
快速启动
安装django任务:
pip install django-task
将其添加到已安装的应用程序中
INSTALLED_APPS=(...'django_rq','django_task',...)
添加django任务的url模式:
urlpatterns=[...url(r'^django_task/',include('django_task.urls',namespace='django_task')),...]
功能
目的
- 以编程方式或从管理员创建异步任务
- 从管理员监视异步任务
- 将所有任务记录到数据库中以备日后检查
- 可选地将特定于任务的日志保存在文本字段和/或文件字段中
详细信息
-
每个特定的作业都是从models.task派生的模型中描述的。
负责:
- 在可用队列中选择消费者队列的名称
- 收集并保存关联作业所需的所有参数
- 异步运行特定作业
-
可以运行新作业:
- 从django管理员创建任务
- 从代码创建任务,然后调用task.run()
-
作业执行工作流:
- 作业执行由task.run(is_async)触发
- 作业将接收task.id,并从中检索参数(task.retrieve_params_as_dict())
- 在开始时,job会将任务状态更新为"started",并保存job.id以供参考
- 在执行过程中,作业可以更新进度指示器
- 完成后,任务状态最终更新为"成功"或"失败"
- 有关示例,请参见example.jobs.count\u beans
屏幕截图
应用程序设置
< dl >日志文件的路径。
默认值:无
示例:os.path.abspath(os.path.join(base_dir,"…","protected","tasklog")
如果为true,则所有任务都将同步执行(对于调试和单元测试很有用)。
默认值:false
在job.run()中启用低级跟踪-用于调试具有挑战性的竞争条件
默认值:false
如果特定任务队列中没有可用的活动工作线程,则拒绝任务 调用task.run()时
默认值:false
默认值:"redis://localhost:6379/0"
运行测试
代码真的有用吗?
source <YOURVIRTUALENV>/bin/activate (myenv) $ pip install tox (myenv) $ tox
支持工作类
从0.3.0版开始,添加了一些便利性:
- 不再需要作业函数的@job decorator,因为task.run()现在 使用queue.enqueue()而不是jobfunc.delay(),并检索队列 直接从任务本身命名
- 每个任务可以设置自己的任务超时值(以秒为单位)。 当提供时将覆盖默认队列超时
- 提供了一个新的job类,以便在 执行jobfunc之后
classJob(object):@classmethoddefrun(job_class,task_class,task_id):job_trace('job.run() enter')task=Noneresult='SUCCESS'failure_reason=''try:# this raises a "Could not resolve a Redis connection" exception in sync mode#job = get_current_job()job=get_current_job(connection=redis.Redis.from_url(REDIS_URL))# Retrieve task obj and set as Startedtask=task_class.get_task_from_id(task_id)task.set_status(status='STARTED',job_id=job.get_id())# Execute job passing by taskjob_class.execute(job,task)exceptExceptionase:job_trace('ERROR: %s'%str(e))job_trace(traceback.format_exc())iftask:task.log(logging.ERROR,str(e))task.log(logging.ERROR,traceback.format_exc())result='FAILURE'failure_reason=str(e)finally:iftask:task.set_status(status=result,failure_reason=failure_reason)try:job_class.on_complete(job,task)exceptExceptionase:job_trace('NESTED ERROR: Job.on_completed() raises error "%s"'%str(e))job_trace(traceback.format_exc())job_trace('job.run() leave')@staticmethoddefon_complete(job,task):pass@staticmethoddefexecute(job,task):pass
因此,您可以重写 run() 来实现不同的逻辑, 或者(在大多数情况下)只需提供自己的 execute() 方法,并且可以选择 覆盖 on_complete() 以在作业完成后执行清理操作;
示例:
classCountBeansJob(Job):@staticmethoddefexecute(job,task):params=task.retrieve_params_as_dict()num_beans=params['num_beans']foriinrange(0,num_beans):time.sleep(0.01)task.set_progress((i+1)*100/num_beans,step=10)@staticmethoddefon_complete(job,task):print('task "%s" completed with: %s'%(str(task.id),task.status))# An more realistic example from a real project ...# if task.status != 'SUCCESS' or task.error_counter > 0:# task.alarm = BaseTask.ALARM_STATUS_ALARMED# task.save(update_fields=['alarm', ])
执行
运行消费者:
python manage.py runserver
运行工作线程:
python manage.py rqworker low high default python manage.py rqworker low high default ...
示例任务
fromdjango.dbimportmodelsfromdjango.confimportsettingsfromdjango_task.modelsimportTaskclassSendEmailTask(Task):sender=models.CharField(max_length=256,null=False,blank=False)recipients=models.TextField(null=False,blank=False,help_text='put addresses in separate rows')subject=models.CharField(max_length=256,null=False,blank=False)message=models.TextField(null=False,blank=True)TASK_QUEUE=settings.QUEUE_LOWTASK_TIMEOUT=60LOG_TO_FIELD=TrueLOG_TO_FILE=FalseDEFAULT_VERBOSITY=2@staticmethoddefget_jobfunc():from.jobsimportSendEmailJobreturnSendEmailJob
通过重写verbosity属性,可以动态更改 详细信息
当使用 log_to_file=true 时,您可能需要将清理处理程序添加到 删除相应记录时删除日志文件:
pip install django-task0
pip install django-task1
示例作业
pip install django-task2
样本管理命令
pip install django-task3
延迟任务检索以避免作业与任务竞争条件
提供了一个helper task.get_task_from_id()classmethod来检索任务对象 安全地从任务ID。
任务队列创建一种新的竞争条件。为什么? 因为消息队列很快! 多快? 比数据库快。
<:< /P>https://speakerdeck.com/siloraptor/django-tasty salard-dos-donts-using-celeriy
类似的通用助手可用于工作派生需求:
pip install django-task4
如何在同一台计算机上为不同实例分隔作业
在同一台机器上分离不同实例的任务(或者更精确地说 对于同一个redis连接),覆盖每个实例的队列名称;
例如:
pip install django-task5
然后按如下方式运行Worker:
pip install django-task6
如何使用cron计划作业
调用管理命令"count_beans",它依次执行所需的作业。
例如:
pip install django-task7
提供了一个基类taskcommand来简化任何特定 任务相关管理commad;
派生的管理命令只负责:
- 定义适当的命令行参数
- 选择特定的任务类和作业功能
例如:
pip install django-task8
javascript帮助程序
提供了一些实用程序视图,用于与来自javascript的任务进行交互。
任务信息API
检索有关现有任务列表的信息
示例用法:
pip install django-task9
结果:
INSTALLED_APPS=(...'django_rq','django_task',...)0
任务添加API
根据指定的参数创建并运行新任务
预期参数:
< Buff行情>- 'task-model'="<;应用程序名称>;<;模型名称>;"
- …任务参数…
返回新任务的ID
TOdo:提供一个实际使用示例
任务运行api
安排指定任务的执行。
返回job.id或抛出错误(400)。
参数:
- 应用程序标签
- 型号名称
- 是异步的(0或1,默认值为1)
示例用法:
INSTALLED_APPS=(...'django_rq','django_task',...)1