我们使用芹菜作为一种自制的map reduce工具,我们在多个工人上并行运行作业并等待结果。我们使用celeri事件将工作线程上的python日志消息传输回客户端。我们知道这些消息是重要的,但我们可以使用日志收集客户端的消息。问题是,在没有线程的情况下,如何在客户机上注册芹菜事件侦听器?还有,这是我们所做的完全错误吗?这对我们有用,但也许还有更好的方法。在
我在github中创建了一个工作示例:
https://github.com/gijzelaerr/celerylog
下面是我们在做什么的小解释。在
我们在worker上运行这个:
class TaskLogEmitter(logging.Handler):
def __init__(self, celery_app, level=logging.NOTSET):
self.celery_app = celery_app
super(TaskLogEmitter, self).__init__(level=level)
def emit(self, record):
with self.celery_app.events.default_dispatcher() as d:
d.send('task-log', msg=record.getMessage(), levelno=record.levelno,
pathname=record.pathname, lineno=record.lineno,
name=record.name, exc_info=record.exc_info)
@after_setup_logger.connect
@after_setup_task_logger.connect
def setup_task_log_emitter(logger=None, **kwargs):
"""
setup log handler
"""
handler = TaskLogEmitter(celery_app)
logger.addHandler(handler)
这将注册一个python日志处理程序,并为每个日志记录发出一个celry事件。在客户机上,我们有一个偶数监视功能:
^{pr2}$我们从一个线程开始,否则它会阻塞:
def setup_event_listening(celery_app):
thread = threading.Thread(target=monitor_events, args=[celery_app])
thread.daemon = True
thread.start()
目前没有回答
相关问题 更多 >
编程相关推荐