使用celry事件传输python日志记录

2024-10-04 07:34:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我们使用芹菜作为一种自制的map reduce工具,我们在多个工人上并行运行作业并等待结果。我们使用celeri事件将工作线程上的python日志消息传输回客户端。我们知道这些消息是重要的,但我们可以使用日志收集客户端的消息。问题是,在没有线程的情况下,如何在客户机上注册芹菜事件侦听器?还有,这是我们所做的完全错误吗?这对我们有用,但也许还有更好的方法。在

我在github中创建了一个工作示例:

https://github.com/gijzelaerr/celerylog

下面是我们在做什么的小解释。在

我们在worker上运行这个:

class TaskLogEmitter(logging.Handler):
    def __init__(self, celery_app, level=logging.NOTSET):
        self.celery_app = celery_app
        super(TaskLogEmitter, self).__init__(level=level)

    def emit(self, record):
        with self.celery_app.events.default_dispatcher() as d:
            d.send('task-log', msg=record.getMessage(), levelno=record.levelno,
                   pathname=record.pathname, lineno=record.lineno,
                   name=record.name,  exc_info=record.exc_info)

@after_setup_logger.connect
@after_setup_task_logger.connect
def setup_task_log_emitter(logger=None, **kwargs):
    """
    setup log handler
    """
    handler = TaskLogEmitter(celery_app)
    logger.addHandler(handler)

这将注册一个python日志处理程序,并为每个日志记录发出一个celry事件。在客户机上,我们有一个偶数监视功能:

^{pr2}$

我们从一个线程开始,否则它会阻塞:

def setup_event_listening(celery_app):
    thread = threading.Thread(target=monitor_events, args=[celery_app])
    thread.daemon = True
    thread.start()

Tags: selflogapp消息taskdefsetup事件