使用事件跟踪查询、对象和 Session 更改¶
SQLAlchemy 具有广泛的事件侦听功能
系统在整个 Core 和 ORM 中使用。 在 ORM 中,有一个
各种事件侦听器钩子,这些钩子记录在 API 中
水平在 ORM 事件。多年来,这个事件集合已经发展壮大,包括许多非常有用的新事件以及一些不再像以前那样相关的旧事件。本节将尝试介绍主要事件钩子以及何时可以使用它们。
执行事件¶
1.4 版本中的新功能: Session
现在具有一个全面的钩子,旨在拦截代表 ORM 发出的所有 SELECT 语句以及批量 UPDATE 和 DELETE 语句。此钩子取代了之前的 QueryEvents.before_compile()
event 以及 QueryEvents.before_compile_update()
QueryEvents.before_compile_delete()
。
Session
具有一个全面的系统,通过该系统,所有查询都通过 Session.execute()
方法调用,其中包括 Query
发出的所有 SELECT 语句以及所有 SELECT 语句
代表列和关系加载器发出的语句,可以
被拦截和修改。 系统利用
SessionEvents.do_orm_execute()
event 钩子以及
ORMExecuteState
对象来表示事件状态。
基本查询拦截¶
SessionEvents.do_orm_execute()
首先对任何类型的
拦截查询,包括
使用 1.x 样式的查询
以及启用 ORM 时
2.0 样式select()
,
update()
或 delete()
结构被传递给
Session.execute() 的 Session.execute()
中。ORMExecuteState
构造提供了允许修改语句、参数和选项的访问器:
Session = sessionmaker(engine)
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if orm_execute_state.is_select:
# add populate_existing for all SELECT statements
orm_execute_state.update_execution_options(populate_existing=True)
# check if the SELECT is against a certain entity and add an
# ORDER BY if so
col_descriptions = orm_execute_state.statement.column_descriptions
if col_descriptions[0]["entity"] is MyEntity:
orm_execute_state.statement = statement.order_by(MyEntity.name)
上面的示例说明了对 SELECT 语句的一些简单修改。在这个级别上, SessionEvents.do_orm_execute()
事件钩子打算替换以前对 QueryEvents.before_compile()
事件的使用,该事件并没有为各种类型的加载器一致地触发;此外,QueryEvents.before_compile()
仅适用于 1.x 样式
与 Query
一起使用,而不是与 2.0 样式一起使用
Session.execute() 的 Session.execute()
中。
添加全局 WHERE / ON 标准¶
请求最多的查询扩展功能之一是能够将 WHERE 条件添加到所有查询中实体的所有匹配项。这可以通过使用 with_loader_criteria()
查询选项来实现,该选项
可以单独使用,或者非常适合在
SessionEvents.do_orm_execute()
事件:
from sqlalchemy.orm import with_loader_criteria
Session = sessionmaker(engine)
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if (
orm_execute_state.is_select
and not orm_execute_state.is_column_load
and not orm_execute_state.is_relationship_load
):
orm_execute_state.statement = orm_execute_state.statement.options(
with_loader_criteria(MyEntity.public == True)
)
在上面,所有 SELECT 语句都添加了一个选项,该选项将限制对 MyEntity
的所有查询以筛选 public == True
。该条件将应用于即时查询范围内该类的所有加载。默认情况下,with_loader_criteria()
选项也会自动传播到关系加载器,这将应用于后续的关系加载,包括延迟加载、selectinloads 等。
对于一系列都具有一些公共列结构的类,如果这些类是使用声明性 mixin 组合的,则 mixin 类本身可以与 with_loader_criteria()
结合使用
选项。 Python lambda 将在
针对符合条件的特定实体的查询编译时间。
给定一系列基于名为 HasTimestamp
的 mixin 的类:
import datetime
class HasTimestamp:
timestamp = mapped_column(DateTime, default=datetime.datetime.now)
class SomeEntity(HasTimestamp, Base):
__tablename__ = "some_entity"
id = mapped_column(Integer, primary_key=True)
class SomeOtherEntity(HasTimestamp, Base):
__tablename__ = "some_entity"
id = mapped_column(Integer, primary_key=True)
上述类 SomeEntity
和 SomeOtherEntity
各有一列
timestamp
的默认日期和时间。事件可用于拦截从 HasTimestamp
扩展的所有对象,并过滤其
timestamp
列的日期不超过一个月:
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if (
orm_execute_state.is_select
and not orm_execute_state.is_column_load
and not orm_execute_state.is_relationship_load
):
one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)
orm_execute_state.statement = orm_execute_state.statement.options(
with_loader_criteria(
HasTimestamp,
lambda cls: cls.timestamp >= one_month_ago,
include_aliases=True,
)
)
警告
在 调用
with_loader_criteria()
每个唯一类只调用一次。
不应在此 lambda 中调用自定义函数。 看
使用 Lambda 显著提高语句生成速度,以概述“lambda SQL”功能,该功能仅供高级使用。
另请参阅
ORM 查询事件 - 包括上述 with_loader_criteria()
配方的工作示例。
重新执行语句¶
深度炼金术
语句重新执行功能涉及一个稍微复杂的递归序列,旨在解决能够将 SQL 语句的执行重新路由到各种非 SQL 上下文的相当困难的问题。下面链接的 “dogpile caching” 和 “horizontal sharding” 这两个例子应该用作何时适合使用这个相当高级的功能的指南。
ORMExecuteState
能够控制给定语句的执行;这包括完全不调用该语句的能力,允许返回从缓存中检索到的预构建结果集,以及使用不同的状态重复调用同一语句的能力,例如针对多个数据库连接调用它,然后将结果合并到内存中。SQLAlchemy 的示例套件中演示了这两种高级模式,如下所述。
当在 SessionEvents.do_orm_execute()
event hook 中时,
ORMExecuteState.invoke_statement()
方法可用于使用 Session.execute()
的新嵌套调用来调用语句,然后它将抢占当前正在进行的执行的后续处理,而是返回由
内部执行。 到目前为止,为
SessionEvents.do_orm_execute()
此进程中的 hook 也将在此嵌套调用中跳过。
该方法 ORMExecuteState.invoke_statement()
返回一个
Result
对象;然后,此对象具有
被 “冻结” 为可缓存格式,并 “解冻” 为新的
Result
对象,以及将其数据与其他 Result
对象的数据合并。
例如,用于 SessionEvents.do_orm_execute()
实现缓存:
from sqlalchemy.orm import loading
cache = {}
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if "my_cache_key" in orm_execute_state.execution_options:
cache_key = orm_execute_state.execution_options["my_cache_key"]
if cache_key in cache:
frozen_result = cache[cache_key]
else:
frozen_result = orm_execute_state.invoke_statement().freeze()
cache[cache_key] = frozen_result
return loading.merge_frozen_result(
orm_execute_state.session,
orm_execute_state.statement,
frozen_result,
load=False,
)
有了上面的钩子,使用缓存的示例将如下所示:
stmt = (
select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy")
)
result = session.execute(stmt)
在上面,自定义执行选项被传递给
Select.execution_options()
来建立一个 “缓存键”,然后被 SessionEvents.do_orm_execute()
钩子拦截。然后,此缓存键将与缓存中可能存在的 FrozenResult
对象匹配,如果存在,则会重复使用该对象。该配方使用 Result.freeze()
方法“冻结”一个
Result
对象,上面将包含 ORM 结果,这样它就可以存储在缓存中并多次使用。为了从 “frozen” 结果返回实时结果,merge_frozen_result()
函数用于将 result 对象中的 “冻结” 数据合并到
当前会话。
上面的示例在 Dogpile Caching 中作为完整示例实现。
ORMExecuteState.invoke_statement()
该方法也可以调用
多次,将不同的信息传递给
ORMExecuteState.invoke_statement.bind_arguments
参数,这样 Session
就会使用不同的
Engine
对象。 这将返回一个不同的
Result
对象;这些结果可以使用 Result.merge()
方法合并在一起。这是 Horizontal Sharding 扩展所采用的技术;请参阅源代码以熟悉。
持久化事件¶
可能最广泛使用的一系列事件是 “persistence” 事件,它与 flush 进程相对应。flush 是关于对象的待处理更改做出所有决策的地方,然后以 INSERT、UPDATE 和 DELETE 语句的形式发出到数据库。
before_flush()
¶
当应用程序想要确保在进行 flush 时对数据库进行额外的持久性更改时,SessionEvents.before_flush()
钩子是迄今为止最常用的事件。使用 SessionEvents.before_flush()
可对对象进行作以验证其状态,并在持久保存对象和引用之前编写其他对象和引用。在这个事件中,可以安全地作 Session 的状态,也就是说,可以向其附加新对象,可以删除对象,并且可以自由更改对象上的各个属性,当事件钩子完成时,这些更改将被拉入 flush 进程。
典型的 SessionEvents.before_flush()
钩子的任务是扫描集合 Session.new
、Session.dirty
和
Session.deleted
来查找将要发生某些事情的对象。
有关 SessionEvents.before_flush()
的图示,请参阅使用历史记录表进行版本控制等示例和
使用 Temporal Rows 进行版本控制。
after_flush()
¶
在 SessionEvents.after_flush()
钩子为刷新进程发出 SQL 之后,但在刷新的对象的状态发生更改之前调用。也就是说,您仍然可以检查 Session.new
、Session.dirty
和
Session.deleted
集合来查看刚刚刷新的内容,您还可以使用 AttributeState
提供的历史记录跟踪功能来查看刚刚保留的更改。在 SessionEvents.after_flush()
事件中,可以根据观察到的更改内容向数据库发出其他 SQL。
after_flush_postexec()
¶
SessionEvents.after_flush_postexec()
不久后调用
SessionEvents.after_flush()
调用,但在修改对象的状态以考虑刚刚发生的 flush 后调用。Session.new
、Session.dirty
和
Session.deleted
集合在这里通常是完全空的。用于 SessionEvents.after_flush_postexec()
检查身份映射
对于最终确定的对象,并且可能会发出其他 SQL。 在这个钩子中,
能够对对象进行新的更改,这意味着
Session
将再次进入 “dirty” 状态;的机制
如果
有新的更改,则此处的会话将导致它再次刷新
如果刷新是在
Session.commit()
的否则,待处理的更改将捆绑为下一次正常刷新的一部分。当钩子检测到 Session.commit()
中的新更改时,计数器确保 this 中的
regard 在 100 次迭代后停止,如果
SessionEvents.after_flush_postexec()
Hook 每次调用时都会不断添加新的 state 进行 flushed。
Mapper 级别的 Flush 事件¶
除了 flush 级别的 hook 之外,还有一套更细粒度的 hook,因为它们是在每个对象的基础上调用的,并在 flush 进程中根据 INSERT、UPDATE 或 DELETE 进行分解。这些是 mapper 持久性钩子,它们也非常受欢迎,但是需要更谨慎地处理这些事件,因为它们在已经正在进行的 flush 过程的上下文中进行;在这里进行许多作并不安全。
这些事件是:
注意
请务必注意,这些事件仅适用于
session flush作,而不是
ORM 级别的 INSERT/UPDATE/DELETE 功能,如
启用 ORM 的 INSERT、UPDATE 和 DELETE 语句。要拦截 ORM 级别的 DML,请使用
SessionEvents.do_orm_execute()
事件。
每个事件都传递给 Mapper
、映射对象本身以及用于发出 INSERT、UPDATE 或 DELETE 语句的 Connection
。这些事件的吸引力是显而易见的,因为如果应用程序希望将某些活动与使用 INSERT 持久化特定类型的对象的时间绑定,则钩子非常具体;与 SessionEvents.before_flush()
事件不同,无需搜索 Session.new
等集合
为了找到目标。 但是,flush plan which
表示每个 INSERT、UPDATE、DELETE 语句的完整列表
在调用这些事件时,已经决定了要发出,在此阶段不能进行任何更改。因此,对给定对象唯一可能的更改是对象行的本地属性。对对象或其他对象的任何其他更改都将影响 Session 的状态,而 Session
的状态将无法正常运行。
这些映射器级别持久性事件中不支持的作包括:
映射的集合追加、添加、删除、删除、丢弃等。
映射的关系属性 set/del 事件,即someobject.related = someotherobject
传递 Connection
的原因是鼓励
简单的 SQL作直接发生在 Connection
上,例如递增计数器或在日志表中插入额外的行。
还有许多基于对象的作根本不需要在 flush 事件中处理。最常见的替代方法是简单地在其 __init__()
中与对象一起建立附加状态
方法,例如创建要关联的其他对象
新对象。 使用 Simple Validators 中描述的验证器是另一种方法;这些函数可以截获对属性的更改,并在目标对象上建立其他状态更改以响应属性更改。使用这两种方法,对象在进入 flush 步骤之前都处于正确的状态。
对象生命周期事件¶
事件的另一个用例是跟踪对象的生命周期。这是指在 Quickie Object States 介绍中首次引入的状态。
以上所有状态都可以通过 events 进行完全跟踪。每个事件都表示一个不同的状态转换,这意味着起始状态和目标状态都是被跟踪内容的一部分。除了初始 transient 事件外,所有事件都是关于 Session
对象或 class 的,这意味着它们可以与特定的 Session
对象相关联:
from sqlalchemy import event
from sqlalchemy.orm import Session
session = Session()
@event.listens_for(session, "transient_to_pending")
def object_is_pending(session, obj):
print("new pending: %s" % obj)
或者使用 Session
类本身,以及特定的
sessionmaker
,这可能是最有用的形式:
from sqlalchemy import event
from sqlalchemy.orm import sessionmaker
maker = sessionmaker()
@event.listens_for(maker, "transient_to_pending")
def object_is_pending(session, obj):
print("new pending: %s" % obj)
当然,侦听器可以堆叠在一个函数之上,这很常见。例如,要跟踪所有进入持久状态的对象,请执行以下作:
@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
print("object is now persistent: %s" % instance)
瞬态¶
所有映射对象在首次构造时都以 transient 开始。在此状态下,对象单独存在,不与任何 Session
关联。对于此初始状态,没有特定的 “transition” 事件,因为没有 Session
,但是如果有一个
想要在创建任何瞬态对象时进行拦截,则
InstanceEvents.init()
方法可能是最好的事件。此事件应用于特定类或超类。例如,要拦截特定声明性基的所有新对象:
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import event
class Base(DeclarativeBase):
pass
@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
print("new transient: %s" % instance)
瞬态到待处理¶
当 transient 对象首次通过 Session.add()
或 Session.add_all()
与 Session
关联时,它变为 pending
方法。 对象也可能由于显式添加的引用对象的 “级联” 而成为 Session
的一部分。可以使用以下 SessionEvents.transient_to_pending()
事件来检测 transient to pending 转换:
@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
print("transient to pending: %s" % object_)
待处理到持久化¶
待处理对象在刷新
proceeds,并针对实例执行 INSERT 语句。 对象
现在有了一个身份密钥。 将待处理跟踪到持续,并使用
SessionEvents.pending_to_persistent()
事件:
@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
print("pending to persistent: %s" % object_)
待处理到瞬态¶
如果
Session.rollback()
方法在待处理对象被刷新之前调用,或者如果调用 Session.expunge()
方法
对于刷新对象之前的对象。 跟踪待处理到瞬态,使用
SessionEvents.pending_to_transient()
事件:
@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
print("transient to pending: %s" % object_)
加载为 Persistent¶
对象可以直接出现在 Session
的持久化
state 的值。 跟踪此状态转换
与加载对象时的跟踪对象同义,并且是同义词
使用 InstanceEvents.load()
实例级事件。 但是,
SessionEvents.loaded_as_persistent()
event 作为以 session 为中心的钩子提供,用于在对象通过以下特定途径进入持久状态时对其进行拦截:
@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
print("object loaded into persistent state: %s" % object_)
持久到瞬态¶
如果
Session.rollback()
方法为对象首次添加为 pending 的事务调用。在 ROLLBACK 的情况下,使此对象持久化的 INSERT 语句将回滚,并且该对象将从 Session
中逐出以再次变为瞬态对象。使用 SessionEvents.persistent_to_transient()
事件钩子:
@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
print("persistent to transient: %s" % object_)
持久化到已删除¶
在 flush 过程中从数据库中删除标记为删除的对象时,持久性对象将进入 deleted 状态。请注意,这与 Session.delete()
method 调用目标对象。 Session.delete()
method 仅将对象标记为删除;在刷新继续之前,不会发出实际的 DELETE 语句。在刷新之后,目标对象会出现 “deleted” 状态。
在 “deleted” 状态下,对象仅与 Session
进行轻微关联。它不存在于身份映射中,也不存在于引用它等待删除的时间的 Session.deleted
集合中。
对象可以在提交事务时从 “deleted” 状态进入 detached 状态,或者如果事务回滚,则返回持久状态。
跟踪永久到已删除的过渡
SessionEvents.persistent_to_deleted()
:
@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
print("object was DELETEd, is now in deleted state: %s" % object_)
已删除到分离¶
提交会话的事务时,已删除的对象将分离。调用 Session.commit()
方法后,数据库事务是最终的,Session
现在完全丢弃已删除的对象并删除与它的所有关联。使用以下方式 SessionEvents.deleted_to_detached()
跟踪已删除到分离的过渡 :
@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
print("deleted to detached: %s" % object_)
注意
当对象处于 deleted 状态时,InstanceState.deleted
属性(可使用 inspect(object).deleted
访问)返回 True。但是,当对象分离时,InstanceState.deleted
将再次返回 False。要检测对象是否已删除,无论它是否已分离,请使用 InstanceState.was_deleted
访问。
持久化到分离化¶
当对象通过 Session.expunge()
与 Session
取消关联时,持久化对象将变为分离状态。
Session.expunge_all()
或 Session.close()
方法。
注意
如果对象拥有
Session
被应用程序取消引用,并因垃圾回收而被丢弃。在这种情况下,不会发出任何事件。
使用
SessionEvents.persistent_to_detached()
事件:
@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
print("object became detached: %s" % object_)
分离为 Persistent¶
当分离的对象使用 Session.add()
或等效方法与 session 重新关联时,它将成为持久的。 跟踪
对象从 Detached 移回 persistent
SessionEvents.detached_to_persistent()
事件:
@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
print("object became persistent again: %s" % object_)
已删除为持久化¶
已删除的对象可以恢复为持久
回滚 DELETEd 的事务时的状态
使用 Session.rollback()
方法。 跟踪已删除的对象
使用
SessionEvents.deleted_to_persistent()
事件:
@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
print("deleted to persistent: %s" % object_)
交易事件¶
事务事件允许在 Session
级别发生事务边界时以及
Session
更改 Connection
上的事务状态
对象。
SessionEvents.after_transaction_create()
、SessionEvents.after_transaction_end()
- 这些事件以不特定于单个数据库连接。 这些事件是 旨在帮助集成交易跟踪系统,例如
当
应用程序需要将某些外部范围与Session
的事务范围保持一致时,请使用这些事件。这些钩子反映了Session
的“嵌套”事务行为,因为它们跟踪逻辑“子事务”以及“嵌套”(例如 SAVEPOINT)事务。SessionEvents.before_commit()
,SessionEvents.after_commit()
,SessionEvents.after_begin()
、SessionEvents.after_rollback()
,SessionEvents.after_soft_rollback()
- 这些事件允许从数据库连接的角度跟踪事务事件。特别是 SessionEvents.after_begin()
是每个连接的事件;维护多个 connection 的Session
将在当前事务中使用这些连接时为每个 connection 单独发出此事件。然后,回滚和提交事件是指 DBAPI 连接本身直接收到回滚或提交指令的时间。
属性更改事件¶
属性更改事件允许拦截对象上的特定属性何时被修改。这些事件包括 AttributeEvents.set()、
AttributeEvents.append()
和 AttributeEvents.remove() 的 AttributeEvents.remove()
进行访问。这些事件非常有用,特别是对于每个对象的验证作;但是,使用 “validator” 钩子通常要方便得多,它在幕后使用这些钩子;有关此内容的背景,请参阅 Simple Validators 。属性事件也是反向引用机制的后面。说明属性事件用法的示例位于 属性检测 中。