芹菜任务重复问题

2024-09-30 10:36:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我有3个芹菜节拍实例运行(使用不同的设置.py)我的副总裁。其中三个实例由三个代码相同的网站使用。 这个任务基本上是向几百个注册用户发送电子邮件(使用sendgrid)。在

我的问题是我的任务在使用ETA方法运行时运行了3次,如下所示。在

    sdate = datetime.datetime.strptime(request.POST['schedule_date'],'%d-%m-%Y %H:%M')
                tz=get_current_timezone()
    celery_scheduled_campaign.apply_async(eta=tz.localize(sdate),
                    kwargs={'schedule_id': schedule.id })

但在使用.delay方法时,会按预期运行(只运行一次)。在

^{pr2}$

设置_一个.py

...
BROKER_URL = 'redis://localhost:6379/0'
...

设置_二.py

...
BROKER_URL = 'redis://localhost:6379/1'
...

设置_三.py

...
BROKER_URL = 'redis://localhost:6379/2'
...

任务.py

from celery import task
from bulkmailer import send_email
from models import CampaignSchedule, SendgridEmailQuota
import logging
logger = logging.getLogger("ecm_console")
#import pdb
#import time
#from django.core.mail import EmailMultiAlternatives

@task.task(ignore_result=True)
def celery_sendmail_task(obj,unsubscribe_url,host):
    #time.sleep(10)
    send_email(obj,unsubscribe_url,host)
    obj.status=True
    if obj.campaign_opt=='S':
        obj.campaign_opt='R'
    obj.save()

@task.task(ignore_result=True)
def sendgrid_quota_reset():
    try:
        quota = SendgridEmailQuota.objects.get(pk=1)
        quota.used=0
        quota.save()
        logger.info("Success : sendgrid_quota_reset job ")
    except Exception, e:
        logger.error("Critical Error : sendgrid_quota_reset: {0} ".format(e))

@task.task(ignore_result=True)
def celery_scheduled_campaign(schedule_id):
    try:
        obj = CampaignSchedule.objects.get(pk=schedule_id)
        send_email(obj.campaign, obj.unsub_url, obj.ecm_host)
        obj.campaign.status = True
        obj.campaign.save()
    except Exception, e:
        logger.error("Critical Error : celery_scheduled_campaign: {0} ".format(e))

用于运行芹菜的命令

Python管理.py芹菜工人-B-c2--loglevel=info--settings=ecm.设置在

Python管理.py芹菜工人-B-c2--loglevel=info--settings=ecm.设置2在

Python管理.py芹菜工人-B-c2--loglevel=info--settings=ecm.设置3在

版本

芹菜==3.0.21 django芹菜==3.0.21 Python 2.7.3

编辑1 芹菜日志显示任务会在几个小时后自动添加

[2014-11-24 22:09:32,521: INFO/MainProcess] Celerybeat: Shutting down...
[2014-11-24 22:09:32,557: WARNING/MainProcess] Restoring 1 unacknowledged message(s).
[2014-11-24 22:09:40,495: INFO/Beat] Celerybeat: Starting...
[2014-11-24 22:09:40,540: WARNING/MainProcess] celery@mailer ready.
[2014-11-24 22:09:40,547: INFO/MainProcess] consumer: Connected to redis://localhost:6379/3.
[2014-11-24 22:09:40,614: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

^^这是我从前端添加任务的地方。下面的任务正在自动添加

[2014-11-24 23:09:53,039: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

一个周期性的任务,没有ETA正常运行

[2014-11-25 00:01:00,044: INFO/Beat] Scheduler: Sending due task ecm_sendgrid_sync (ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync)
[2014-11-25 00:01:00,052: INFO/MainProcess] Got task from broker: ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8]
[2014-11-25 00:01:02,262: INFO/MainProcess] Success : update job  
[2014-11-25 00:01:02,265: INFO/MainProcess] Task ecm_sendgridapi.tasks.ecm_sendgridapi_dbsync[37c94a3a-f6c2-433c-81a3-ae351c7018f8] succeeded in 2.18759179115s: None

同样,自动添加ETA的任务。注意哈希是相同的。在

[2014-11-25 00:10:12,190: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 01:10:26,029: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 02:10:39,025: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 03:10:50,063: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 04:00:00,007: INFO/Beat] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2014-11-25 04:00:00,064: INFO/MainProcess] Got task from broker: celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] expires:[2014-11-25 16:00:00.008296+04:00]
[2014-11-25 04:00:01,533: INFO/MainProcess] Task celery.backend_cleanup[35a4db80-008e-49c9-9735-2dc1df5e0ecc] succeeded in 1.01458001137s: None
[2014-11-25 04:11:03,062: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 05:11:15,073: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 06:11:26,101: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 07:11:38,324: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]
[2014-11-25 08:11:53,097: INFO/MainProcess] Got task from broker: ecm_core.tasks.celery_scheduled_campaign[f5c82a1d-3996-4266-9023-3f7e07538e84] eta:[2014-11-25 09:00:00+04:00]

这可能是旧版本中的错误。我还怀疑我的VPS内存不足(使用了400+/489)


Tags: fromcoreinfoobjtaskbrokertaskscelery
2条回答

终于修好了。添加了一个锁机制来确保任务只被执行再来一次详细信息here。在

py任务

# ...
import redis
@task.task(ignore_result=True)
def celery_scheduled_campaign(schedule_id):
    LOCK_EXPIRE = 60 * 30 # Lock expires in 30 minutes
    obj = campaign.objects.get(pk=schedule_id)
    my_lock = redis.Redis().lock(obj.campaign_uuid,timeout=LOCK_EXPIRE)
    if my_lock.acquire(blocking=False) and obj.is_complete == False:
        #...
        # Task to run
        #...
        obj.is_complete = True
        my_lock.release()

模型.py

^{pr2}$

确保所有3条消息不会发送到同一个端口,这会导致同一个端口上出现多个celery实例。在

相关问题 更多 >

    热门问题