使用事件跟踪查询、对象和 Session 更改


SQLAlchemy 具有广泛的事件侦听功能 系统在整个 Core 和 ORM 中使用。 在 ORM 中,有一个 各种事件侦听器钩子,这些钩子记录在 API 中 水平在 ORM 事件。多年来,这个事件集合已经发展壮大,包括许多非常有用的新事件以及一些不再像以前那样相关的旧事件。本节将尝试介绍主要事件钩子以及何时可以使用它们。


执行事件


1.4 版本中的新功能: Session 现在具有一个全面的钩子,旨在拦截代表 ORM 发出的所有 SELECT 语句以及批量 UPDATE 和 DELETE 语句。此钩子取代了之前的 QueryEvents.before_compile() event 以及 QueryEvents.before_compile_update() QueryEvents.before_compile_delete()


Session 具有一个全面的系统,通过该系统,所有查询都通过 Session.execute() 方法调用,其中包括 Query 发出的所有 SELECT 语句以及所有 SELECT 语句 代表列和关系加载器发出的语句,可以 被拦截和修改。 系统利用 SessionEvents.do_orm_execute() event 钩子以及 ORMExecuteState 对象来表示事件状态。


基本查询拦截


SessionEvents.do_orm_execute() 首先对任何类型的 拦截查询,包括 使用 1.x 样式查询以及启用 ORM 时 2.0 样式select()update()delete() 结构被传递给 Session.execute() 的 Session.execute() 中。ORMExecuteState 构造提供了允许修改语句、参数和选项的访问器:

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if orm_execute_state.is_select:
        # add populate_existing for all SELECT statements

        orm_execute_state.update_execution_options(populate_existing=True)

        # check if the SELECT is against a certain entity and add an
        # ORDER BY if so
        col_descriptions = orm_execute_state.statement.column_descriptions

        if col_descriptions[0]["entity"] is MyEntity:
            orm_execute_state.statement = statement.order_by(MyEntity.name)


上面的示例说明了对 SELECT 语句的一些简单修改。在这个级别上, SessionEvents.do_orm_execute() 事件钩子打算替换以前对 QueryEvents.before_compile() 事件的使用,该事件并没有为各种类型的加载器一致地触发;此外,QueryEvents.before_compile() 仅适用于 1.x 样式Query 一起使用,而不是与 2.0 样式一起使用 Session.execute() 的 Session.execute() 中。


添加全局 WHERE / ON 标准


请求最多的查询扩展功能之一是能够将 WHERE 条件添加到所有查询中实体的所有匹配项。这可以通过使用 with_loader_criteria() 查询选项来实现,该选项 可以单独使用,或者非常适合在 SessionEvents.do_orm_execute() 事件:

from sqlalchemy.orm import with_loader_criteria

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(MyEntity.public == True)
        )


在上面,所有 SELECT 语句都添加了一个选项,该选项将限制对 MyEntity 的所有查询以筛选 public == True。该条件将应用于即时查询范围内该类的所有加载。默认情况下,with_loader_criteria() 选项也会自动传播到关系加载器,这将应用于后续的关系加载,包括延迟加载、selectinloads 等。


对于一系列都具有一些公共列结构的类,如果这些类是使用声明性 mixin 组合的,则 mixin 类本身可以与 with_loader_criteria() 结合使用 选项。 Python lambda 将在 针对符合条件的特定实体的查询编译时间。 给定一系列基于名为 HasTimestamp 的 mixin 的类:

import datetime


class HasTimestamp:
    timestamp = mapped_column(DateTime, default=datetime.datetime.now)


class SomeEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)


class SomeOtherEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)


上述类 SomeEntitySomeOtherEntity 各有一列 timestamp 的默认日期和时间。事件可用于拦截从 HasTimestamp 扩展的所有对象,并过滤其 timestamp 列的日期不超过一个月:

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)

        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(
                HasTimestamp,
                lambda cls: cls.timestamp >= one_month_ago,
                include_aliases=True,
            )
        )


警告


在 调用 with_loader_criteria() 每个唯一类只调用一次。 不应在此 lambda 中调用自定义函数。 看 使用 Lambda 显著提高语句生成速度,以概述“lambda SQL”功能,该功能仅供高级使用。


另请参阅


ORM 查询事件 - 包括上述 with_loader_criteria() 配方的工作示例。


重新执行语句


深度炼金术


语句重新执行功能涉及一个稍微复杂的递归序列,旨在解决能够将 SQL 语句的执行重新路由到各种非 SQL 上下文的相当困难的问题。下面链接的 “dogpile caching” 和 “horizontal sharding” 这两个例子应该用作何时适合使用这个相当高级的功能的指南。


ORMExecuteState 能够控制给定语句的执行;这包括完全不调用该语句的能力,允许返回从缓存中检索到的预构建结果集,以及使用不同的状态重复调用同一语句的能力,例如针对多个数据库连接调用它,然后将结果合并到内存中。SQLAlchemy 的示例套件中演示了这两种高级模式,如下所述。


当在 SessionEvents.do_orm_execute() event hook 中时, ORMExecuteState.invoke_statement() 方法可用于使用 Session.execute() 的新嵌套调用来调用语句,然后它将抢占当前正在进行的执行的后续处理,而是返回由 内部执行。 到目前为止,为 SessionEvents.do_orm_execute() 此进程中的 hook 也将在此嵌套调用中跳过。


该方法 ORMExecuteState.invoke_statement() 返回一个 Result 对象;然后,此对象具有 被 “冻结” 为可缓存格式,并 “解冻” 为新的 Result 对象,以及将其数据与其他 Result 对象的数据合并。


例如,用于 SessionEvents.do_orm_execute() 实现缓存:

from sqlalchemy.orm import loading

cache = {}


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if "my_cache_key" in orm_execute_state.execution_options:
        cache_key = orm_execute_state.execution_options["my_cache_key"]

        if cache_key in cache:
            frozen_result = cache[cache_key]
        else:
            frozen_result = orm_execute_state.invoke_statement().freeze()
            cache[cache_key] = frozen_result

        return loading.merge_frozen_result(
            orm_execute_state.session,
            orm_execute_state.statement,
            frozen_result,
            load=False,
        )


有了上面的钩子,使用缓存的示例将如下所示:

stmt = (
    select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy")
)

result = session.execute(stmt)


在上面,自定义执行选项被传递给 Select.execution_options() 来建立一个 “缓存键”,然后被 SessionEvents.do_orm_execute() 钩子拦截。然后,此缓存键将与缓存中可能存在的 FrozenResult 对象匹配,如果存在,则会重复使用该对象。该配方使用 Result.freeze() 方法“冻结”一个 Result 对象,上面将包含 ORM 结果,这样它就可以存储在缓存中并多次使用。为了从 “frozen” 结果返回实时结果,merge_frozen_result() 函数用于将 result 对象中的 “冻结” 数据合并到 当前会话。


上面的示例在 Dogpile Caching 中作为完整示例实现。


ORMExecuteState.invoke_statement() 该方法也可以调用 多次,将不同的信息传递给 ORMExecuteState.invoke_statement.bind_arguments 参数,这样 Session 就会使用不同的 Engine 对象。 这将返回一个不同的 Result 对象;这些结果可以使用 Result.merge() 方法合并在一起。这是 Horizontal Sharding 扩展所采用的技术;请参阅源代码以熟悉。


持久化事件


可能最广泛使用的一系列事件是 “persistence” 事件,它与 flush 进程相对应。flush 是关于对象的待处理更改做出所有决策的地方,然后以 INSERT、UPDATE 和 DELETE 语句的形式发出到数据库。


before_flush()


当应用程序想要确保在进行 flush 时对数据库进行额外的持久性更改时,SessionEvents.before_flush() 钩子是迄今为止最常用的事件。使用 SessionEvents.before_flush() 可对对象进行作以验证其状态,并在持久保存对象和引用之前编写其他对象和引用。在这个事件中,可以安全地作 Session 的状态,也就是说,可以向其附加新对象,可以删除对象,并且可以自由更改对象上的各个属性,当事件钩子完成时,这些更改将被拉入 flush 进程。


典型的 SessionEvents.before_flush() 钩子的任务是扫描集合 Session.newSession.dirtySession.deleted 来查找将要发生某些事情的对象。


有关 SessionEvents.before_flush() 的图示,请参阅使用历史记录表进行版本控制等示例和 使用 Temporal Rows 进行版本控制


after_flush()


在 SessionEvents.after_flush() 钩子为刷新进程发出 SQL 之后,但在刷新的对象的状态发生更改之前调用。也就是说,您仍然可以检查 Session.newSession.dirtySession.deleted 集合来查看刚刚刷新的内容,您还可以使用 AttributeState 提供的历史记录跟踪功能来查看刚刚保留的更改。在 SessionEvents.after_flush() 事件中,可以根据观察到的更改内容向数据库发出其他 SQL。


after_flush_postexec()


SessionEvents.after_flush_postexec() 不久后调用 SessionEvents.after_flush() 调用,但在修改对象的状态以考虑刚刚发生的 flush 调用。Session.newSession.dirtySession.deleted 集合在这里通常是完全空的。用于 SessionEvents.after_flush_postexec() 检查身份映射 对于最终确定的对象,并且可能会发出其他 SQL。 在这个钩子中, 能够对对象进行新的更改,这意味着 Session 将再次进入 “dirty” 状态;的机制 如果有新的更改,则此处的会话将导致它再次刷新 如果刷新是在 Session.commit()的否则,待处理的更改将捆绑为下一次正常刷新的一部分。当钩子检测到 Session.commit() 中的新更改时,计数器确保 this 中的 regard 在 100 次迭代后停止,如果 SessionEvents.after_flush_postexec() Hook 每次调用时都会不断添加新的 state 进行 flushed。


Mapper 级别的 Flush 事件


除了 flush 级别的 hook 之外,还有一套更细粒度的 hook,因为它们是在每个对象的基础上调用的,并在 flush 进程中根据 INSERT、UPDATE 或 DELETE 进行分解。这些是 mapper 持久性钩子,它们也非常受欢迎,但是需要更谨慎地处理这些事件,因为它们在已经正在进行的 flush 过程的上下文中进行;在这里进行许多作并不安全。


这些事件是:


注意


请务必注意,这些事件仅适用于 session flush作而不是 ORM 级别的 INSERT/UPDATE/DELETE 功能,如 启用 ORM 的 INSERT、UPDATE 和 DELETE 语句。要拦截 ORM 级别的 DML,请使用 SessionEvents.do_orm_execute() 事件。


每个事件都传递给 Mapper、映射对象本身以及用于发出 INSERT、UPDATE 或 DELETE 语句的 Connection。这些事件的吸引力是显而易见的,因为如果应用程序希望将某些活动与使用 INSERT 持久化特定类型的对象的时间绑定,则钩子非常具体;与 SessionEvents.before_flush() 事件不同,无需搜索 Session.new 等集合 为了找到目标。 但是,flush plan which 表示每个 INSERT、UPDATE、DELETE 语句的完整列表 在调用这些事件时,已经决定了要发出,在此阶段不能进行任何更改。因此,对给定对象唯一可能的更改是对象行的本地属性。对对象或其他对象的任何其他更改都将影响 Session 的状态,而 Session 的状态将无法正常运行。


这些映射器级别持久性事件中不支持的作包括:


  • Session.add()


  • 会话.delete()


  • 映射的集合追加、添加、删除、删除、丢弃等。


  • 映射的关系属性 set/del 事件,即 someobject.related = someotherobject


传递 Connection 的原因是鼓励 简单的 SQL作直接发生在 Connection 上,例如递增计数器或在日志表中插入额外的行。


还有许多基于对象的作根本不需要在 flush 事件中处理。最常见的替代方法是简单地在其 __init__() 中与对象一起建立附加状态 方法,例如创建要关联的其他对象 新对象。 使用 Simple Validators 中描述的验证器是另一种方法;这些函数可以截获对属性的更改,并在目标对象上建立其他状态更改以响应属性更改。使用这两种方法,对象在进入 flush 步骤之前都处于正确的状态。


对象生命周期事件


事件的另一个用例是跟踪对象的生命周期。这是指在 Quickie Object States 介绍中首次引入的状态。


以上所有状态都可以通过 events 进行完全跟踪。每个事件都表示一个不同的状态转换,这意味着起始状态和目标状态都是被跟踪内容的一部分。除了初始 transient 事件外,所有事件都是关于 Session 对象或 class 的,这意味着它们可以与特定的 Session 对象相关联:

from sqlalchemy import event
from sqlalchemy.orm import Session

session = Session()


@event.listens_for(session, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)


或者使用 Session 类本身,以及特定的 sessionmaker,这可能是最有用的形式:

from sqlalchemy import event
from sqlalchemy.orm import sessionmaker

maker = sessionmaker()


@event.listens_for(maker, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)


当然,侦听器可以堆叠在一个函数之上,这很常见。例如,要跟踪所有进入持久状态的对象,请执行以下作:

@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
    print("object is now persistent: %s" % instance)


瞬态


所有映射对象在首次构造时都以 transient 开始。在此状态下,对象单独存在,不与任何 Session 关联。对于此初始状态,没有特定的 “transition” 事件,因为没有 Session,但是如果有一个 想要在创建任何瞬态对象时进行拦截,则 InstanceEvents.init() 方法可能是最好的事件。此事件应用于特定类或超类。例如,要拦截特定声明性基的所有新对象:

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import event


class Base(DeclarativeBase):
    pass


@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
    print("new transient: %s" % instance)


瞬态到待处理


当 transient 对象首次通过 Session.add()Session.add_all()Session 关联时,它变为 pending 方法。 对象也可能由于显式添加的引用对象的 “级联” 而成为 Session 的一部分。可以使用以下 SessionEvents.transient_to_pending() 事件来检测 transient to pending 转换:

@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
    print("transient to pending: %s" % object_)


待处理到持久化¶


待处理对象在刷新 proceeds,并针对实例执行 INSERT 语句。 对象 现在有了一个身份密钥。 将待处理跟踪到持续,并使用 SessionEvents.pending_to_persistent() 事件:

@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
    print("pending to persistent: %s" % object_)


待处理到瞬态


如果 Session.rollback() 方法在待处理对象被刷新之前调用,或者如果调用 Session.expunge() 方法 对于刷新对象之前的对象。 跟踪待处理到瞬态,使用 SessionEvents.pending_to_transient() 事件:

@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
    print("transient to pending: %s" % object_)


加载为 Persistent


对象可以直接出现在 Session持久化 state 的值。 跟踪此状态转换 与加载对象时的跟踪对象同义,并且是同义词 使用 InstanceEvents.load() 实例级事件。 但是, SessionEvents.loaded_as_persistent() event 作为以 session 为中心的钩子提供,用于在对象通过以下特定途径进入持久状态时对其进行拦截:

@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
    print("object loaded into persistent state: %s" % object_)


持久到瞬态


如果 Session.rollback() 方法为对象首次添加为 pending 的事务调用。在 ROLLBACK 的情况下,使此对象持久化的 INSERT 语句将回滚,并且该对象将从 Session 中逐出以再次变为瞬态对象。使用 SessionEvents.persistent_to_transient() 事件钩子:

@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
    print("persistent to transient: %s" % object_)


持久化到已删除


在 flush 过程中从数据库中删除标记为删除的对象时,持久性对象将进入 deleted 状态。请注意,这与 Session.delete() method 调用目标对象。 Session.delete() method 仅将对象标记为删除;在刷新继续之前,不会发出实际的 DELETE 语句。在刷新之后,目标对象会出现 “deleted” 状态。


在 “deleted” 状态下,对象仅与 Session 进行轻微关联。它不存在于身份映射中,也不存在于引用它等待删除的时间的 Session.deleted 集合中。


对象可以在提交事务时从 “deleted” 状态进入 detached 状态,或者如果事务回滚,则返回持久状态。


跟踪永久到已删除的过渡 SessionEvents.persistent_to_deleted()

@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
    print("object was DELETEd, is now in deleted state: %s" % object_)


已删除到分离


提交会话的事务时,已删除的对象将分离。调用 Session.commit() 方法后,数据库事务是最终的,Session 现在完全丢弃已删除的对象并删除与它的所有关联。使用以下方式 SessionEvents.deleted_to_detached() 跟踪已删除到分离的过渡 :

@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
    print("deleted to detached: %s" % object_)


注意


当对象处于 deleted 状态时,InstanceState.deleted 属性(可使用 inspect(object).deleted 访问)返回 True。但是,当对象分离时,InstanceState.deleted 将再次返回 False。要检测对象是否已删除,无论它是否已分离,请使用 InstanceState.was_deleted 访问。


持久化到分离化¶


当对象通过 Session.expunge()Session 取消关联时,持久化对象将变为分离状态。 Session.expunge_all()Session.close() 方法。


注意


如果对象拥有 Session 被应用程序取消引用,并因垃圾回收而被丢弃。在这种情况下,不会发出任何事件


使用 SessionEvents.persistent_to_detached() 事件:

@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
    print("object became detached: %s" % object_)


分离为 Persistent


当分离的对象使用 Session.add() 或等效方法与 session 重新关联时,它将成为持久的。 跟踪 对象从 Detached 移回 persistent SessionEvents.detached_to_persistent() 事件:

@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
    print("object became persistent again: %s" % object_)


已删除为持久化¶


已删除的对象可以恢复为持久 回滚 DELETEd 的事务时的状态 使用 Session.rollback() 方法。 跟踪已删除的对象 使用 SessionEvents.deleted_to_persistent() 事件:

@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
    print("deleted to persistent: %s" % object_)


交易事件


事务事件允许在 Session 级别发生事务边界时以及 Session 更改 Connection 上的事务状态 对象。


属性更改事件


属性更改事件允许拦截对象上的特定属性何时被修改。这些事件包括 AttributeEvents.set()、 AttributeEvents.append()AttributeEvents.remove() 的 AttributeEvents.remove() 进行访问。这些事件非常有用,特别是对于每个对象的验证作;但是,使用 “validator” 钩子通常要方便得多,它在幕后使用这些钩子;有关此内容的背景,请参阅 Simple Validators 。属性事件也是反向引用机制的后面。说明属性事件用法的示例位于 属性检测 中。