一个django应用程序,可以从admin或cron运行新的后台任务,并从admin检查任务历史;基于django rq

django-task的Python项目详细描述


django任务

https://badge.fury.io/py/django-task.svghttps://travis-ci.org/morlandi/django-task.svg?branch=masterhttps://codecov.io/gh/morlandi/django-task/branch/master/graph/badge.svg

一个django应用程序,用于从admin或cron运行新的后台任务,并从admin检查任务历史记录;基于django rq

快速启动

安装django任务:

pip install django-task

将其添加到已安装的应用程序中

INSTALLED_APPS=(...'django_rq','django_task',...)

添加django任务的url模式:

urlpatterns=[...url(r'^django_task/',include('django_task.urls',namespace='django_task')),...]

功能

目的

  • 以编程方式或从管理员创建异步任务
  • 从管理员监视异步任务
  • 将所有任务记录到数据库中以备日后检查
  • 可选地将特定于任务的日志保存在文本字段和/或文件字段中

详细信息

  1. 每个特定的作业都是从models.task派生的模型中描述的。 负责:
    • 在可用队列中选择消费者队列的名称
    • 收集并保存关联作业所需的所有参数
    • 异步运行特定作业
  2. 可以运行新作业:
    • 从django管理员创建任务
    • 从代码创建任务,然后调用task.run()
  3. 作业执行工作流:
    • 作业执行由task.run(is_async)触发
    • 作业将接收task.id,并从中检索参数(task.retrieve_params_as_dict())
    • 在开始时,job会将任务状态更新为"started",并保存job.id以供参考
    • 在执行过程中,作业可以更新进度指示器
    • 完成后,任务状态最终更新为"成功"或"失败"
    • 有关示例,请参见example.jobs.count\u beans

屏幕截图

example/etc/screenshot_001.pngexample/etc/screenshot_002.png

应用程序设置

< dl >
djangotask日志根目录

日志文件的路径。

默认值:无

示例:os.path.abspath(os.path.join(base_dir,"…","protected","tasklog")

djangotask总是迫不及待的

如果为true,则所有任务都将同步执行(对于调试和单元测试很有用)。

默认值:false

djangotask_job_trace_启用

在job.run()中启用低级跟踪-用于调试具有挑战性的竞争条件

默认值:false

如果队列中没有活动的工作线程,则djangotask拒绝

如果特定任务队列中没有可用的活动工作线程,则拒绝任务 调用task.run()时

默认值:false

redis网址
要连接到的redis服务器

默认值:"redis://localhost:6379/0"

运行测试

代码真的有用吗?

source <YOURVIRTUALENV>/bin/activate
(myenv) $ pip install tox
(myenv) $ tox

支持工作类

从0.3.0版开始,添加了一些便利性:

  • 不再需要作业函数的@job decorator,因为task.run()现在 使用queue.enqueue()而不是jobfunc.delay(),并检索队列 直接从任务本身命名
  • 每个任务可以设置自己的任务超时值(以秒为单位)。 当提供时将覆盖默认队列超时
  • 提供了一个新的job类,以便在 执行jobfunc之后
classJob(object):@classmethoddefrun(job_class,task_class,task_id):job_trace('job.run() enter')task=Noneresult='SUCCESS'failure_reason=''try:# this raises a "Could not resolve a Redis connection" exception in sync mode#job = get_current_job()job=get_current_job(connection=redis.Redis.from_url(REDIS_URL))# Retrieve task obj and set as Startedtask=task_class.get_task_from_id(task_id)task.set_status(status='STARTED',job_id=job.get_id())# Execute job passing by taskjob_class.execute(job,task)exceptExceptionase:job_trace('ERROR: %s'%str(e))job_trace(traceback.format_exc())iftask:task.log(logging.ERROR,str(e))task.log(logging.ERROR,traceback.format_exc())result='FAILURE'failure_reason=str(e)finally:iftask:task.set_status(status=result,failure_reason=failure_reason)try:job_class.on_complete(job,task)exceptExceptionase:job_trace('NESTED ERROR: Job.on_completed() raises error "%s"'%str(e))job_trace(traceback.format_exc())job_trace('job.run() leave')@staticmethoddefon_complete(job,task):pass@staticmethoddefexecute(job,task):pass

因此,您可以重写 run() 来实现不同的逻辑, 或者(在大多数情况下)只需提供自己的 execute() 方法,并且可以选择 覆盖 on_complete() 以在作业完成后执行清理操作;

示例:

classCountBeansJob(Job):@staticmethoddefexecute(job,task):params=task.retrieve_params_as_dict()num_beans=params['num_beans']foriinrange(0,num_beans):time.sleep(0.01)task.set_progress((i+1)*100/num_beans,step=10)@staticmethoddefon_complete(job,task):print('task "%s" completed with: %s'%(str(task.id),task.status))# An more realistic example from a real project ...# if task.status != 'SUCCESS' or task.error_counter > 0:#    task.alarm = BaseTask.ALARM_STATUS_ALARMED#    task.save(update_fields=['alarm', ])

执行

运行消费者:

python manage.py runserver

运行工作线程:

python manage.py rqworker low high default
python manage.py rqworker low high default
...

示例任务

fromdjango.dbimportmodelsfromdjango.confimportsettingsfromdjango_task.modelsimportTaskclassSendEmailTask(Task):sender=models.CharField(max_length=256,null=False,blank=False)recipients=models.TextField(null=False,blank=False,help_text='put addresses in separate rows')subject=models.CharField(max_length=256,null=False,blank=False)message=models.TextField(null=False,blank=True)TASK_QUEUE=settings.QUEUE_LOWTASK_TIMEOUT=60LOG_TO_FIELD=TrueLOG_TO_FILE=FalseDEFAULT_VERBOSITY=2@staticmethoddefget_jobfunc():from.jobsimportSendEmailJobreturnSendEmailJob

通过重写verbosity属性,可以动态更改 详细信息

当使用 log_to_file=true 时,您可能需要将清理处理程序添加到 删除相应记录时删除日志文件:

pip install django-task
0
pip install django-task
1

示例作业

pip install django-task
2

样本管理命令

pip install django-task
3

延迟任务检索以避免作业与任务竞争条件

提供了一个helper task.get_task_from_id()classmethod来检索任务对象 安全地从任务ID。

任务队列创建一种新的竞争条件。为什么? 因为消息队列很快! 多快? 比数据库快。

<:< /P>

https://speakerdeck.com/siloraptor/django-tasty salard-dos-donts-using-celeriy

类似的通用助手可用于工作派生需求:

pip install django-task
4

如何在同一台计算机上为不同实例分隔作业

在同一台机器上分离不同实例的任务(或者更精确地说 对于同一个redis连接),覆盖每个实例的队列名称;

例如:

pip install django-task
5

然后按如下方式运行Worker:

pip install django-task
6

如何使用cron计划作业

调用管理命令"count_beans",它依次执行所需的作业。

例如:

pip install django-task
7

提供了一个基类taskcommand来简化任何特定 任务相关管理commad;

派生的管理命令只负责:

  • 定义适当的命令行参数
  • 选择特定的任务类和作业功能

例如:

pip install django-task
8

javascript帮助程序

提供了一些实用程序视图,用于与来自javascript的任务进行交互。

任务信息API

检索有关现有任务列表的信息

示例用法:

pip install django-task
9

结果:

INSTALLED_APPS=(...'django_rq','django_task',...)
0

任务添加API

根据指定的参数创建并运行新任务

预期参数:

< Buff行情>
  • 'task-model'="<;应用程序名称>;<;模型名称>;"
  • …任务参数…

返回新任务的ID

TOdo:提供一个实际使用示例

任务运行api

安排指定任务的执行。

返回job.id或抛出错误(400)。

参数:

  • 应用程序标签
  • 型号名称
  • PK
  • 是异步的(0或1,默认值为1)

示例用法:

INSTALLED_APPS=(...'django_rq','django_task',...)
1

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java文件分块,获取长度字节   java嵌入式Tomcat不执行jsf页面   java我的数据库中有2个实体,但hibernate返回其中6个。   java如何基于逗号拆分字符串   java取消已经运行的CompletableFutures的预期模式是什么   java如何在informix中从另一个数据库复制表ddl和数据   为什么图片是黑色的?   java根据字符串数组中的单词筛选列表   Java8的集合。平行流有效吗?   Kotlin中的java静态内部类   java如何在GUI中生成一列字符串   javafx如何正确使用高对比度主题?   带空格的javascript Httpurlconnection参数   java如何设置GridBagLayout的约束   java如何在一个线程可能尚未初始化时关闭另一个线程   java将简单时间格式转换为特殊时间格式(hhmmt)   安卓/java阵列重复过滤器的问题   java在队列的链接实现下,入队和出队是如何工作的   java更新sql外键约束