我的应用程序正在使用范围会话和SQLALchemy的声明式风格。它是一个web应用程序,许多DB插入都是由任务调度程序Celery
执行的。
通常,当决定插入一个对象时,我的代码可能会沿着以下几行执行操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
这里的问题是,因为很多工作都是由异步工作人员完成的,所以一个工作人员可能在插入带有id=123
的Bike
的过程中完成了一半,而另一个工作人员正在检查它的存在性。在这种情况下,第二个worker将尝试插入具有相同主键的行,SQLAlchemy将引发一个IntegrityError
。
我一辈子都找不到解决这个问题的好办法,除了把Session.commit()
换成:
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in table." % e.params[0])
Session.rollback()
然后在任何地方我都有Session.commit
我现在有schema.commit(ignore=True)
我不介意行不再被插入。
在我看来,这似乎很脆弱,因为字符串检查。正如仅供参考,当IntegrityError
升高时,它看起来是这样的:
(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")
当然,我插入的主键是Duplicate entry is a cool thing
,我想我可能会漏掉IntegrityError
,这并不是因为主键重复。
有没有更好的方法可以保持我使用的干净的SQLAlchemy方法(而不是开始用字符串写出语句等等…)
Db是MySQL(尽管对于单元测试,我喜欢使用SQLite,并且不想用任何新方法阻碍这种能力)。
干杯!
如果使用
session.merge(bike)
而不是session.add(bike)
,则不会生成主键错误。将根据需要检索并更新或创建bike
。我假设你的主键在某种程度上是自然的,这就是为什么你不能依赖普通的自动增量技术。所以假设这个问题实际上是需要插入的某个唯一列中的一个,这是比较常见的。
如果希望“尝试插入,失败时部分回滚”,可以使用保存点,对于SQLAlchemy,保存点是begin_nested()。下一个rollback()或commit()只作用于该保存点,而不是作用于更大范围的事件。
然而,总的来说,这里的模式只是一个真正应该避免的模式。你真正想在这里做的是三件事之一。一。不要运行处理需要插入的相同密钥的并发作业。2。以某种方式在与和3一起工作的并发密钥上同步作业。使用一些公共服务生成此特定类型的新记录,这些记录由作业共享(或确保在作业运行之前都已设置好)。
如果你仔细想想,2在任何情况下都是高度孤立的。开始两个postgres会话。会议1:
会议2:
您将看到的是,会话2块,因为PK#1的行被锁定。我不确定MySQL是否足够聪明,但这是正确的行为。如果您试图插入另一个PK:
一切进展顺利,没有阻碍。
关键是,如果您正在进行这种PK/UQ争用,那么您的芹菜任务无论如何都要序列化自己,或者至少应该序列化。
您应该以同样的方式处理每一个
IntegrityError
:回滚事务,然后有选择地重试。有些数据库甚至不允许您在一个IntegrityError
之后做更多的事情。您还可以在两个冲突事务开始时获取表上的锁,或者在数据库允许的情况下获取更细粒度的锁。使用
with
语句显式开始事务,并自动提交(或回滚任何异常):相关问题 更多 >
编程相关推荐