使用SQLAlchemy作用域的\u会话与Celery

2024-09-30 14:20:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用SQLAlchemy和celeri而不是作为后端(我使用的芹菜后端是RabbitMQ)。在

在阅读这篇question和SQLAlchemy文档后,我想到了这个解决方案:

首先,我实现了一个扩展Celery'Task类的类:

class DBTask(Task):
    _engine = None
    _Session = None

    def __init__(self):
        self.logger = get_task_logger('__name__')

    def after_return(self, *args, **kwargs):
        self.logger.info("DBTask: Removing session. {}".format(self._Session))
        if self._Session is not None:
            self.logger.debug("DBTask: Session removed. {}".format(self._Session))
            self._Session.remove()

    @property
    def engine(self):
        if self._engine is None:
            self.logger.info("DBTask: Creating engine.")
            self.config = GlobalConfigReader().get_config()
            self._engine = create_engine(self.config['postgres_uri'], pool_pre_ping=True)
        return self._engine

    @property
    def Session(self):
        if self._Session is None:
            self._Session = scoped_session(sessionmaker(bind=self.engine))
            self.logger.info("DBTask: Creating new session: {}".format(self._Session))
            Base.metadata.create_all(self.engine)
        return self._Session

在我的一项任务中,我要做的是:

^{pr2}$

似乎每个运行在线程中的Celery任务都会创建一个新的DB连接,并且永远不会关闭它。这是一张普罗米修斯数据库状态图:

prometheus graph

看起来在芹菜开始的时候,它们之间的联系会大大增加,而且它们会保持一定程度的稳定。(下降是因为我重新启动了Postgres)。在

另一件我无法解释的事情是,当after_return被调用时,self_Session总是没有,所以{}永远不会被调用。在

[编辑]: 通过查看所有的日志,我意识到有时{}不是无,所以{}被调用,但不是每次{}。在


Tags: selfinfononeconfigformatreturnifsqlalchemy