当工人被杀时,芹菜中的默认等待时间是多少?

2024-10-04 05:24:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我在任务装饰器中使用下面的选项来重新排列任务,当工作人员被杀死时。你知道吗

acks_late=True

reject_on_worker_lost=True

有没有办法缩短重新排队的等待时间?你知道吗

已安装以下软件包。

celery==4.3.0

kombu==4.6.3

amqp==2.3.2

billiard==3.5.0.4

期望值:

https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker

根据文档,默认的worker\u lost\u wait为10秒。你知道吗

No. of workers running: 2

concurrency: 1 (so two tasks in parallel)

Task behaviour: Task will run for 40s and fail due to SoftTimeli

  • 正在使用apply\u async创建任务。你知道吗
task = sample_celery_task.s(x=len(celery_async_result), y=0)
result = task.apply_async(soft_time_limit=40,
                                  queue=PIPELINE_QUEUE,
                                  retry=True,
                                  retry_policy={
                                      'max_retries': 1,
                                      'interval_start': 0,
                                      'interval_step': 0.2,
                                      'interval_max': 0.2,
                                  })
  • 任务
@app.task(acks_late=True, reject_on_worker_lost=True)
def sample_celery_task(x, y):
    result = x + y
    print(sample_celery_task.reject_on_worker_lost)
    print(sample_celery_task.acks_late)
    time.sleep(60)
    return result

结果

我的假设是,一旦工作线程被杀死,任务将重新排队并由其他可用的工作线程运行。但重新排队大约1小时。你知道吗

杀死工人时出现异常

[2019-09-05 17:35:30,694: ERROR/ForkPoolWorker-1] Pool process error: BrokenPipeError(32, 'Broken pipe') Traceback (most recent call last): File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 362, in workloop put((READY, (job, i, result, inqW_fd))) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 366, in put self.send_payload(ForkingPickler.dumps(obj)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 358, in send_payload self._writer.send_bytes(value) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 229, in send_bytes self._send_bytes(m[offset:offset + size]) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 455, in _send_bytes self._send(header + buf) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 408, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 289, in call sys.exit(self.workloop(pid=pid)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 370, in workloop put((READY, (job, i, (False, einfo), inqW_fd))) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 366, in put self.send_payload(ForkingPickler.dumps(obj)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 358, in send_payload self._writer.send_bytes(value) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 229, in send_bytes self._send_bytes(m[offset:offset + size]) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 455, in _send_bytes self._send(header + buf) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 408, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe


Tags: inpyselfsendvenvlibpackageswith