我在任务装饰器中使用下面的选项来重新排列任务,当工作人员被杀死时。你知道吗
acks_late=True
reject_on_worker_lost=True
有没有办法缩短重新排队的等待时间?你知道吗
已安装以下软件包。
celery==4.3.0
kombu==4.6.3
amqp==2.3.2
billiard==3.5.0.4
期望值:
https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker
根据文档,默认的worker\u lost\u wait为10秒。你知道吗
No. of workers running: 2
concurrency: 1 (so two tasks in parallel)
Task behaviour: Task will run for 40s and fail due to SoftTimeli
task = sample_celery_task.s(x=len(celery_async_result), y=0)
result = task.apply_async(soft_time_limit=40,
queue=PIPELINE_QUEUE,
retry=True,
retry_policy={
'max_retries': 1,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
@app.task(acks_late=True, reject_on_worker_lost=True)
def sample_celery_task(x, y):
result = x + y
print(sample_celery_task.reject_on_worker_lost)
print(sample_celery_task.acks_late)
time.sleep(60)
return result
结果
我的假设是,一旦工作线程被杀死,任务将重新排队并由其他可用的工作线程运行。但重新排队大约1小时。你知道吗
杀死工人时出现异常
[2019-09-05 17:35:30,694: ERROR/ForkPoolWorker-1] Pool process error: BrokenPipeError(32, 'Broken pipe') Traceback (most recent call last): File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 362, in workloop put((READY, (job, i, result, inqW_fd))) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 366, in put self.send_payload(ForkingPickler.dumps(obj)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 358, in send_payload self._writer.send_bytes(value) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 229, in send_bytes self._send_bytes(m[offset:offset + size]) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 455, in _send_bytes self._send(header + buf) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 408, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 289, in call sys.exit(self.workloop(pid=pid)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/pool.py", line 370, in workloop put((READY, (job, i, (False, einfo), inqW_fd))) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 366, in put self.send_payload(ForkingPickler.dumps(obj)) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/queues.py", line 358, in send_payload self._writer.send_bytes(value) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 229, in send_bytes self._send_bytes(m[offset:offset + size]) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 455, in _send_bytes self._send(header + buf) File "/Users/duraipandian/venv/with_line_profiler/lib/python3.6/site-packages/billiard/connection.py", line 408, in _send n = write(self._handle, buf) BrokenPipeError: [Errno 32] Broken pipe
目前没有回答
相关问题 更多 >
编程相关推荐