我正在尝试使用sqlalchemy信号来更新elasticsearch索引,以反映我数据库中的某些模型。在
我遇到的问题是,为了构建ES文档,我访问了一些模型的关系,这意味着在信号周期中有几个点无法访问这些关系。我是从after_(insert|udpate|delete)
的监听器中的模型实例创建ES文档的,但是这些文档是在连接处于flushing
状态时发出的,此时似乎关系返回{
我修改了它,所以我只保存了要发出的命令的队列,保留了对模型的引用,计划是在提交时发出它们,但是before_commit
信号是在刷新之前发出的,当发出after_commit
时,数据库就不能再访问了,所以关系也是不可访问的。在
在我看来,创建和保存文档的正确时机是在after_flush_postexec
信号中,但我有一种感觉,在这和commit
实际发出之间可能会有一个回滚,然后ES索引将不再反映DB。在
我不知道最好的办法是什么。在
这是我正在处理的代码。ChargeIndexer.upsert
采用一个SQLAlchemy ORM模型,并使用elasticsearch api插入/更新基于它构建的文档,.delete
也执行相同的操作,当然,它会删除它,并且只依赖于模型的id
。在
class ChargeListener(object):
ops = []
@event.listens_for(Charge, 'after_delete', propagate=True)
def after_delete(mapper, connection, target):
ChargeListener.add_delete(target)
@event.listens_for(Charge, 'after_insert', propagate=True)
def after_insert(mapper, connection, target):
ChargeListener.add_insert(target)
@event.listens_for(Charge, 'after_update', propagate=True)
def after_update(mapper, connection, target):
ChargeListener.add_insert(target)
@classmethod
def execute_ops(cls):
charges_indexer = ChargesIndexer()
for op, charge in ChargeListener.ops:
if op == 'insert':
charges_indexer.upsert(charge)
elif op == 'delete':
charges_indexer.delete(charge)
ChargeListener.reset()
@event.listens_for(sess, 'after_flush_postexec')
def after_flush_postexec(session, flush_context):
ChargeListener.execute_ops()
@event.listens_for(sess, 'after_soft_rollback')
def after_soft_rollback(session, previous_transaction):
ChargeListener.reset()
@classmethod
def add_insert(cls, charge):
cls.ops.append(('insert', charge))
@classmethod
def add_delete(cls, charge):
cls.ops.append(('delete', charge))
@classmethod
def reset(cls):
cls.ops = []
目前没有回答
相关问题 更多 >
编程相关推荐