事务和连接管理¶
管理交易¶
在 1.4 版本发生变更: 会话事务管理已修订,使其更清晰、更易于使用。特别是,它现在具有 “autobegin”作功能,这意味着可以控制事务开始的点,而无需使用传统的 “autocommit” 模式。
Session
跟踪单个 “虚拟” 事务的状态
时,使用名为
SessionTransaction 的 SessionTransaction
中。 然后,此对象使用底层的
会话
到的一个或多个引擎
对象,以便使用
Connection 对象。
这个 “虚拟” 事务是在需要时自动创建的,或者可以使用 Session.begin()
方法启动。在创建 Session
对象以及维护 SessionTransaction
的范围方面,都尽可能地支持 Python 上下文管理器的使用。
下面,假设我们从 Session
开始:
from sqlalchemy.orm import Session
session = Session(engine)
我们现在可以使用上下文管理器在划分的事务中运行作:
with session.begin():
session.add(some_object())
session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised
在上述上下文的末尾,假设没有引发异常,则任何
待处理对象将被刷新到数据库和数据库事务
将被提交。如果在上述块中引发了异常,则
事务将被回滚。 在这两种情况下,上述
退出区块后的 Session
已准备好在后续交易中使用。
Session.begin()
方法是可选的,而
Session
也可以用于 Commit-as-you-go 方法,它将根据需要自动开始事务;这些只需要提交或回滚:
session = Session(engine)
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.rollback() # rolls back still_another_object
Session
本身具有一个 Session.close()
方法。 如果 Session
是在尚未提交或回滚的事务中开始的,则此方法将取消(即回滚)该事务,并清除 Session
对象状态中包含的所有对象。如果会话
的使用方式使得对 Session.commit()
的调用
或 Session.rollback()
不能保证(例如,不在上下文管理器或类似范围内),则可以使用 close
方法来确保释放所有资源:
# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()
最后,会话构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session
对象的使用范围在固定块内的最佳方法。首先通过 Session
构造函数进行说明:
with Session(engine) as session:
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.commit() # commits
result = session.execute(text("<some SELECT statement>"))
# remaining transactional state from the .execute() call is
# discarded
同样,可以以相同的方式使用 sessionmaker
:
Session = sessionmaker(engine)
with Session() as session:
with session.begin():
session.add(some_object)
# commits
# closes the Session
SessionMaker
本身包含一个 sessionMaker.begin()
方法允许两个作同时进行:
with Session.begin() as session:
session.add(some_object)
使用 SAVEPOINT¶
如果底层引擎支持,则可以使用 Session.begin_nested()
来描述 SAVEPOINT 事务
方法:
Session = sessionmaker()
with Session.begin() as session:
session.add(u1)
session.add(u2)
nested = session.begin_nested() # establish a savepoint
session.add(u3)
nested.rollback() # rolls back u3, keeps u1 and u2
# commits u1 and u2
每次调用 Session.begin_nested()
时,都会向当前数据库事务范围内的数据库发出一个新的 “BEGIN SAVEPOINT” 命令(如果尚未进行,则启动一个),并返回一个 SessionTransaction
类型的对象,该对象表示此 SAVEPOINT 的句柄。当调用此对象上的 .commit()
方法时,“RELEASE SAVEPOINT”将发送到数据库,如果改为使用 .rollback()
方法,则发出 “ROLLBACK TO SAVEPOINT”。 附件
数据库事务仍在进行中。
Session.begin_nested()
通常用作上下文管理器,可以在其中捕获特定的每个实例错误,并结合为事务状态的该部分发出的回滚,而无需回滚整个事务,如下例所示:
for record in records:
try:
with session.begin_nested():
session.merge(record)
except:
print("Skipped record %s" % record)
session.commit()
当上下文管理器由 Session.begin_nested()
完成,它会 “提交” Savepoint,
这包括刷新所有 Pending state 的通常行为。 什么时候
引发错误,保存点回滚,并且
已更改对象的 Session
local 已过期。
此模式非常适合使用 PostgreSQL 和捕获 IntegrityError
以检测重复行等情况;PostgreSQL 通常会在引发此类错误时中止整个事务,但是当使用 SAVEPOINT 时,将维护外部事务。在下面的示例中,数据列表被持久化到数据库中,偶尔会跳过 “duplicate primary key” 记录,而不会回滚整个作:
from sqlalchemy import exc
with session.begin():
for record in records:
try:
with session.begin_nested():
obj = SomeRecord(id=record["identifier"], name=record["name"])
session.add(obj)
except exc.IntegrityError:
print(f"Skipped record {record} - row already exists")
当 Session.begin_nested()
被调用时,Session
首先将所有当前待处理的状态刷新到数据库中;无论 Session.autoflush
参数的值如何,这都会无条件地发生,该参数通常用于禁用自动刷新。此行为的基本原理是,当此嵌套事务发生回滚时,Session
可能会使在
SAVEPOINT,而
确保在刷新这些过期对象时,
SAVEPOINT 开始之前的对象图将可用
从数据库重新加载。
在现代版本的 SQLAlchemy 中,当由
Session.begin_nested()
回滚,自创建 SAVEPOINT 以来修改的内存中对象状态将过期,但自 SAVEPOINT 开始以来未更改的其他对象状态将保持不变。这样,后续作就可以继续使用其他方面不受影响的数据,而无需从数据库中刷新数据。
另请参阅
Connection.begin_nested()
- 核心 SAVEPOINT API
会话级 vs. 引擎级事务控制¶
核心和
_会期。
ORM 中的 Session 具有等效的事务语义,无论是在 sessionmaker
与 Engine
的层面,还是在 Session
与 Connection
的层面。以下部分根据以下方案详细介绍了这些方案:
ORM Core
----------------------------------------- -----------------------------------
sessionmaker Engine
Session Connection
sessionmaker.begin() Engine.begin()
some_session.commit() some_connection.commit()
with some_sessionmaker() as session: with some_engine.connect() as conn:
with some_sessionmaker.begin() as session: with some_engine.begin() as conn:
with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
随用随付¶
Session
和 Connection
功能
Connection.commit()
和 Connection.rollback()
方法。 使用 SQLAlchemy 2.0 风格的作,这些方法会影响
在所有情况下最外层的事务。对于 Session
,假定 Session.autobegin
保留其默认值 True
。
发动机
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.connect() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
conn.commit()
场次
:
Session = sessionmaker(engine)
with Session() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
session.commit()
开始一次¶
sessionmaker
和 Engine
都具有
Engine.begin()
方法,该方法都将获取一个用于执行 SQL 语句的新对象(Session
和
Connection
),然后返回一个上下文管理器,该管理器将维护该对象的 begin/commit/rollback 上下文。
发动机:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
# commits and closes automatically
会期:
Session = sessionmaker(engine)
with Session.begin() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
# commits and closes automatically
嵌套事务¶
通过 Session.begin_nested()
或
Connection.begin_nested()
方法,返回的事务对象必须用于提交或回滚 SAVEPOINT。调用 Session.commit()
或 Connection.commit()
方法将始终提交最外层的事务;这是特定于 SQLAlchemy 2.0 的行为,与 1.x 系列相反。
发动机:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
savepoint = conn.begin_nested()
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
savepoint.commit() # or rollback
# commits automatically
会期:
Session = sessionmaker(engine)
with Session.begin() as session:
savepoint = session.begin_nested()
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
savepoint.commit() # or rollback
# commits automatically
显式开始¶
Session
具有“autobegin”行为,这意味着一旦作开始发生,它就会确保 SessionTransaction
用于跟踪正在进行的作。 此交易已完成
当 Session.commit()
被调用时。
通常,尤其是在框架集成中,需要控制
“begin”作发生的点。 为了适应这种情况,
Session
使用“autobegin”策略,这样
Session.begin()
方法可以直接调用
尚未开始事务的会话
:
Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
session.commit()
except:
session.rollback()
raise
使用上下文管理器可以更惯用地调用上述模式:
Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
Session.begin()
方法和会话的 “autobegin” 过程使用相同的步骤序列来开始事务。这包括在 SessionEvents.after_transaction_create()
事件发生时调用事件;框架使用这个钩子来集成他们自己的事务流程和 ORM Session
的事务流程。
启用两阶段提交¶
对于支持两阶段作的后端(目前是 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 Session.prepare()
会话,用于与非 SQLAlchemy 管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True
:
engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})
session = Session()
# .... work with accounts and users
# commit. session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
设置事务隔离级别 / DBAPI AUTOCOMMIT¶
大多数 DBAPI 都支持可配置事务隔离级别的概念。传统上,这些级别是 “READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ” 和 “SERIALIZABLE” 四个级别。这些通常在 DBAPI 连接开始新事务之前应用于 DBAPI 连接,请注意,大多数 DBAPI 将在首次发出 SQL 语句时隐式地开始此事务。
支持隔离级别的 DBAPI 通常也支持 true 的概念
“autocommit”,这意味着 DBAPI 连接本身将被放入
非事务性 autocommit 模式。 这通常意味着典型的
不再自动向数据库发出 “BEGIN” 的 DBAPI 行为
发生,但它也可能包含其他指令。 使用此模式时,
DBAPI 在任何情况下都不使用事务。像 .begin()、
.commit()
和 .rollback()
这样的 SQLAlchemy 方法以静默方式传递。
SQLAlchemy 的方言支持在每个引擎
上设置的隔离模式
或每个 Connection
的 Package,在
create_engine()
级别以及 Connection.execution_options()
水平。
当使用 ORM Session
时,它充当引擎和
connections,但不直接公开事务隔离。 所以在
为了影响事务隔离级别,我们需要对
发动机
或连接
(视情况而定)。
另请参阅
设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别在 SQLAlchemy Connection
对象级别的工作原理。
为 Sessionmaker / 引擎范围设置隔离¶
要设置具有特定
隔离级别,第一种技术是
可以根据
特定的隔离级别构建引擎
在所有情况下,它都用作
会话
和/或 sessionmaker
:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
"postgresql+psycopg2://scott:tiger@localhost/test",
isolation_level="REPEATABLE READ",
)
Session = sessionmaker(eng)
如果同时有两个具有不同隔离级别的引擎,另一个选项很有用,是使用 Engine.execution_options()
方法,该方法将生成原始 Engine
的浅表副本,该副本与父引擎共享相同的连接池。当作将分为 “transactional” 和 “autocommit”作时,这通常是更可取的:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)
在上面,“eng
” 和 “autocommit_engine”
共享相同的方言和连接池。但是,当从 autocommit_engine
获取连接时,将在连接上设置 “AUTOCOMMIT” 模式。 两人
SessionMaker
对象 “transactional_session
” 和 “autocommit_session”
然后在使用数据库连接时继承这些特征。
“autocommit_session
”仍然具有事务语义,
包括那个
Session.commit()
和 Session.rollback()
仍然认为自己是 “committing” 和 “rolling back” 对象,但是事务将静默地不存在。因此,它通常是
尽管不是严格要求的,但具有 AUTOCOMMIT 隔离的 Session 是
以只读方式使用,即:
with autocommit_session() as session:
some_objects = session.execute(text("<statement>"))
some_other_objects = session.execute(text("<statement>"))
# closes connection
为单个会话设置隔离¶
当我们创建一个新的 Session
时,无论是直接使用构造函数还是调用 sessionmaker
生成的可调用对象,我们都可以直接传递 bind
参数,覆盖预先存在的 bind。例如,我们可以从默认的 Session
创建
sessionmaker
并传递一个用于 autocommit 的引擎集:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# will normally use plain_engine
Session = sessionmaker(plain_engine)
# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
# work with session
...
对于 Session
或 sessionmaker
配置了多个 “binds” 的情况,我们可以重新指定 binding
参数,或者如果我们想只替换特定的绑定,那么我们
可以使用 Session.bind_mapper()
或 Session.bind_table()
方法:
with Session() as session:
session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离¶
关于隔离级别的一个关键警告是,不能在事务已经开始的 Connection
上安全地修改该设置。数据库无法更改正在进行的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在这方面的行为不一致。
因此,最好使用前面的 Session
绑定到具有所需隔离级别的引擎。 然而,孤立
level 可能会受到使用
Session.connection()
方法:
from sqlalchemy.orm import Session
# assume session just constructed
sess = Session(bind=engine)
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# commit transaction. the connection is released
# and reverted to its previous isolation level.
sess.commit()
# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.
在上面,我们首先使用构造函数或
sessionmaker
创建器。然后,我们通过调用 Session.connection()
显式设置数据库级事务的开始,它提供了将在数据库级事务开始之前传递给连接的执行选项。事务将继续使用此选定的隔离级别。事务完成后,在将连接返回到连接池之前,连接上的隔离级别将重置为其默认值。
Session.begin()
方法也可用于开始
会话
级事务;调用
Session.connection()
可用于设置每个连接事务的隔离级别:
sess = Session(bind=engine)
with sess.begin():
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a
# real database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# outside the block, the transaction has been committed. the connection is
# released and reverted to its previous isolation level.
使用事件跟踪事务状态¶
请参阅 Transaction Events 部分,了解 session 事务状态更改的可用事件钩子的概述。
将 Session 加入外部事务(例如测试套件)¶
如果正在使用的 Connection
已经处于事务状态(即已建立 Transaction
),则 Session
可以
只需绑定
Session
到该 Connection 的 Session
。通常的基本原理是一个测试套件,它允许 ORM 代码自由地与 Session
一起工作,包括调用 Session.commit()
的能力,之后整个数据库交互都会回滚。
在 2.0 版更改: “join into an external transaction” 配方在 2.0 中再次进行了新改进;不再需要用于“重置”嵌套事务的事件处理程序。
该配方的工作原理是在
transaction 和可选的 SAVEPOINT,然后将其传递给
Session
作为 “bind”;这
Session.join_transaction_mode
参数通过设置 “create_savepoint”
传递,这表示新的 SAVEPOINT 应该是
created 的
Session
,这将使外部事务保持其传递时的相同状态。
当测试关闭时,外部事务将回滚,以便恢复整个测试中的任何数据更改:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase
# global application scope. create Session class, engine
Session = sessionmaker()
engine = create_engine("postgresql+psycopg2://...")
class SomeTest(TestCase):
def setUp(self):
# connect to the database
self.connection = engine.connect()
# begin a non-ORM transaction
self.trans = self.connection.begin()
# bind an individual Session to the connection, selecting
# "create_savepoint" join_transaction_mode
self.session = Session(
bind=self.connection, join_transaction_mode="create_savepoint"
)
def test_something(self):
# use the session in tests.
self.session.add(Foo())
self.session.commit()
def test_something_with_rollbacks(self):
self.session.add(Bar())
self.session.flush()
self.session.rollback()
self.session.add(Foo())
self.session.commit()
def tearDown(self):
self.session.close()
# rollback - everything that happened with the
# Session above (including calls to commit())
# is rolled back.
self.trans.rollback()
# return connection to the Engine
self.connection.close()
上述配方是 SQLAlchemy 自己的 CI 的一部分,以确保它保持按预期工作。