SQLAlchemy从相关表中删除质量

2024-06-17 14:03:29 发布

您现在位置:Python中文网/ 问答频道 /正文

几天前我发布了一个关于Python脚本占用大量内存的问题Understanding Python memory usage

经过一点尝试和错误,我已经设法将内存使用量的增长定位到一个执行数据库清理的函数。我的简化模式如下(orm):

class A(Base):
    id = Column(Integer, primary_key=True)
    foo = Column(String)
    bar = Column(DateTime)
    replication_confirmed = Column(Boolean, default=False)

class B(Base):
    id = Column(Integer, primary_key=True)
    xyzzy = Column(Integer)
    barf = Column(String)
    replication_confirmed = Column(Boolean, default=False)
    aref = Column(Integer, ForeignKey("A.id"))

为了简化这个过程,这个脚本充当了计算机之间的复制服务器。另一个程序将RabbitMQ消息发送给这个,请求更新,并从json消息中的A和B获得更新和新行。实际上还有其他几个进程修改了A和B,每次更新某个内容时,它们都会将replication confirmed更改为False,并且每次将数据发送到“main”服务器时,该标志都会更改为True

为了避免这个纯过渡数据库中的混乱,我时不时地进行清理,这就是内存增长的地方。我要做的是删除两个表中具有replication_confirmed = True的所有行,同时保持完整性,因为表之间的复制可能不同步

这是我的删除代码(简化,它有错误处理等)。它可以工作,但我假设它现在从上次清理以来大量增长的表中加载大量内容到内存中,这会导致进程请求大量内存,然后再也不会释放这些内存。由于有许多进程操作相同的表,因此我使用with_for_update()来锁定受影响的行,并防止编写器进程在执行清理时修改这些表

_todc = session.query(A).filter(A.replication_confirmed == True).\
           with_for_update().all()
_aids = [i.id for i in _todc]

_todb = session.query(B).filter(B.aref in_(_aids), 
                                B.replication_confirmed == True).all()
for _tb in _todb:
    _session.delete(_tb)
_remaining = session.query(B).all()
_remaining_ids = [i.id for i in _remaining]
_adelete = session.query(A).filter(A.replication_confirmed == True, 
                                   A.id.notin_(_remaining_ids)).all()
for _ta in _adelete:
    session.delete(_ta)
session.commit()

这一切都可行,但一定有更好的办法。由于复制过程的性质,表在复制方面可能不完全同步。B中的行可能已被复制,而A中的引用行仍处于挂起状态。反之亦然

哪种方法更有效?现在它可以读取从受影响的表到内存的所有内容,但我可能可以在数据库级别完成这一切,而无需在内存中处理数据。我只是不知道怎么做

有什么想法吗?这是python2.7、Sqlalchemy和postgresql9.5


Tags: 内存inid数据库truefor进程session
1条回答
网友
1楼 · 发布于 2024-06-17 14:03:29

如果我正确理解了您的意思,那么2次带连接的删除应该可以:

In [11]: session.query(B).\
    ...:     filter(B.replication_confirmed,
    ...:            session.query(A).
    ...:                filter(A.replication_confirmed,
    ...:                       A.id == B.aref).
    ...:                exists()).\
    ...:     delete(synchronize_session=False)

之后是

In [12]: session.query(A).\
    ...:     filter(A.replication_confirmed,
    ...:            ~session.query(B).
    ...:                filter(B.aref == A.id).
    ...:                exists()).\
    ...:     delete(synchronize_session=False)

尽管此处A中的行尚未锁定,但根据您的隔离级别,这两个查询之间可能会发生变化

相关问题 更多 >