内存高效的内置SqlAlchemy迭代器/生成器?

2024-10-06 12:35:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个~10M的MySQL记录表,我使用SqlAlchemy与之接口。我发现,对这个表的大型子集的查询将消耗太多内存,即使我认为我使用的是一个内置的生成器,它可以智能地获取数据集的一小块大小:

for thing in session.query(Things):
    analyze(thing)

为了避免这种情况,我发现我必须构建自己的迭代器,它可以分块执行:

lastThingID = None
while True:
    things = query.filter(Thing.id < lastThingID).limit(querySize).all()
    if not rows or len(rows) == 0: 
        break
    for thing in things:
        lastThingID = row.id
        analyze(thing)

这是正常的还是SA内置发电机有问题?

this question的回答似乎表明内存消耗不是预期的。


Tags: 内存inidforsqlalchemymysqlquery内置
3条回答

大多数DBAPI实现在获取行时对行进行完全缓冲,因此通常在SQLAlchemy ORM获得一个结果之前,整个结果集就在内存中。

但是,Query的工作方式是,在返回对象之前,默认情况下它会完全加载给定的结果集。这里的基本原理涉及的查询不仅仅是简单的SELECT语句。例如,在与可能在一个结果集中多次返回同一对象标识的其他表的联接(与紧急加载相同)中,需要将完整的行集合放在内存中,以便可以返回正确的结果,否则可能只填充部分集合。

因此Query提供了一个选项,可以通过^{}更改此行为。此调用将导致Query成批生成行,并在其中指定批大小。正如docs所说,只有在您没有进行任何类型的集合加载的情况下,这才是合适的,所以基本上是在您真正知道自己在做什么的情况下。另外,如果底层的DBAPI预缓冲了行,那么仍然会有内存开销,因此该方法的伸缩性比不使用它稍微好一些。

我几乎从不使用yield_per();相反,我使用的是您建议的使用窗口函数的更好版本的极限方法。LIMIT和OFFSET有一个很大的问题,即非常大的OFFSET值会导致查询变得越来越慢,因为OFFSET为N会导致查询在N行中分页,这就像每次读取越来越多的行时,执行相同的查询五十次而不是一次。使用窗口函数方法,我预先获取一组“窗口”值,这些值引用要选择的表块。然后,我发出单独的SELECT语句,每次都从这些窗口中提取一个。

窗口函数方法是on the wiki,我使用它非常成功。

另请注意:并非所有数据库都支持窗口函数;您需要Postgresql、Oracle或SQL Server。IMHO至少使用Postgresql是绝对值得的-如果您使用的是关系数据库,那么您最好使用最好的。

我不是数据库专家,但当使用SQLAlchemy作为一个简单的Python抽象层(即,不使用ORM查询对象)时,我已经想出了一个令人满意的解决方案,可以查询一个300万行的表,而不需要爆炸式的内存使用。。。

下面是一个虚拟示例:

from sqlalchemy import create_engine, select

conn = create_engine("DB URL...").connect()
q = select([huge_table])

proxy = conn.execution_options(stream_results=True).execute(q)

然后,我使用SQLAlchemyfetchmany()方法在无限循环中迭代结果:

while 'batch not empty':  # equivalent of 'while True', but clearer
    batch = proxy.fetchmany(100000)  # 100,000 rows at a time

    if not batch:
        break

    for row in batch:
        # Do your stuff here...

proxy.close()

这种方法允许我进行所有类型的数据聚合,而不会产生任何危险的内存开销。

NOTE这个stream_results可以与Postgres和pyscopg2适配器一起工作,但是我想它不能与任何DBAPI一起工作,也不能与任何数据库驱动程序一起工作。。。

在这个blog post中有一个有趣的用例启发了我的上述方法。

我一直在研究使用SQLAlchemy的高效遍历/分页,并希望更新这个答案。

我认为可以使用slice调用来适当地限制查询的范围,并且可以有效地重用它。

示例:

window_size = 10  # or whatever limit you like
window_idx = 0
while True:
    start,stop = window_size*window_idx, window_size*(window_idx+1)
    things = query.slice(start, stop).all()
    if things is None:
        break
    for thing in things:
        analyze(thing)
    if len(things) < window_size:
        break
    window_idx += 1

相关问题 更多 >