事务和连接管理


管理交易


在 1.4 版本发生变更: 会话事务管理已修订,使其更清晰、更易于使用。特别是,它现在具有 “autobegin”作功能,这意味着可以控制事务开始的点,而无需使用传统的 “autocommit” 模式。


Session 跟踪单个 “虚拟” 事务的状态 时,使用名为 SessionTransaction 的 SessionTransaction 中。 然后,此对象使用底层的 会话到的一个或多个引擎 对象,以便使用 Connection 对象


这个 “虚拟” 事务是在需要时自动创建的,或者可以使用 Session.begin() 方法启动。在创建 Session 对象以及维护 SessionTransaction 的范围方面,都尽可能地支持 Python 上下文管理器的使用。


下面,假设我们从 Session 开始:

from sqlalchemy.orm import Session

session = Session(engine)


我们现在可以使用上下文管理器在划分的事务中运行作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised


在上述上下文的末尾,假设没有引发异常,则任何 待处理对象将被刷新到数据库和数据库事务 将被提交。如果在上述块中引发了异常,则 事务将被回滚。 在这两种情况下,上述 退出区块后的 Session 已准备好在后续交易中使用。


Session.begin() 方法是可选的,而 Session 也可以用于 Commit-as-you-go 方法,它将根据需要自动开始事务;这些只需要提交或回滚:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object


Session 本身具有一个 Session.close() 方法。 如果 Session 是在尚未提交或回滚的事务中开始的,则此方法将取消(即回滚)该事务,并清除 Session 对象状态中包含的所有对象。如果会话 的使用方式使得对 Session.commit() 的调用 或 Session.rollback() 不能保证(例如,不在上下文管理器或类似范围内),则可以使用 close 方法来确保释放所有资源:

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()


最后,会话构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session 对象的使用范围在固定块内的最佳方法。首先通过 Session 构造函数进行说明:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(text("<some SELECT statement>"))

# remaining transactional state from the .execute() call is
# discarded


同样,可以以相同的方式使用 sessionmaker

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session


SessionMaker 本身包含一个 sessionMaker.begin() 方法允许两个作同时进行:

with Session.begin() as session:
    session.add(some_object)


使用 SAVEPOINT


如果底层引擎支持,则可以使用 Session.begin_nested() 来描述 SAVEPOINT 事务 方法:

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2


每次调用 Session.begin_nested() 时,都会向当前数据库事务范围内的数据库发出一个新的 “BEGIN SAVEPOINT” 命令(如果尚未进行,则启动一个),并返回一个 SessionTransaction 类型的对象,该对象表示此 SAVEPOINT 的句柄。当调用此对象上的 .commit() 方法时,“RELEASE SAVEPOINT”将发送到数据库,如果改为使用 .rollback() 方法,则发出 “ROLLBACK TO SAVEPOINT”。 附件 数据库事务仍在进行中。


Session.begin_nested() 通常用作上下文管理器,可以在其中捕获特定的每个实例错误,并结合为事务状态的该部分发出的回滚,而无需回滚整个事务,如下例所示:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()


当上下文管理器由 Session.begin_nested() 完成,它会 “提交” Savepoint, 这包括刷新所有 Pending state 的通常行为。 什么时候 引发错误,保存点回滚,并且 已更改对象的 Session local 已过期。


此模式非常适合使用 PostgreSQL 和捕获 IntegrityError 以检测重复行等情况;PostgreSQL 通常会在引发此类错误时中止整个事务,但是当使用 SAVEPOINT 时,将维护外部事务。在下面的示例中,数据列表被持久化到数据库中,偶尔会跳过 “duplicate primary key” 记录,而不会回滚整个作:

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")


Session.begin_nested() 被调用时,Session 首先将所有当前待处理的状态刷新到数据库中;无论 Session.autoflush 参数的值如何,这都会无条件地发生,该参数通常用于禁用自动刷新。此行为的基本原理是,当此嵌套事务发生回滚时,Session 可能会使在 SAVEPOINT,而 确保在刷新这些过期对象时, SAVEPOINT 开始之前的对象图将可用 从数据库重新加载。


在现代版本的 SQLAlchemy 中,当由 Session.begin_nested() 回滚,自创建 SAVEPOINT 以来修改的内存中对象状态将过期,但自 SAVEPOINT 开始以来未更改的其他对象状态将保持不变。这样,后续作就可以继续使用其他方面不受影响的数据,而无需从数据库中刷新数据。


另请参阅


Connection.begin_nested() - 核心 SAVEPOINT API


会话级 vs. 引擎级事务控制


核心_会期。 ORM 中的 Session 具有等效的事务语义,无论是在 sessionmakerEngine 的层面,还是在 SessionConnection 的层面。以下部分根据以下方案详细介绍了这些方案:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:


随用随付


SessionConnection 功能 Connection.commit()Connection.rollback() 方法。 使用 SQLAlchemy 2.0 风格的作,这些方法会影响 在所有情况下最外层的事务。对于 Session,假定 Session.autobegin 保留其默认值 True


发动机

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()


场次

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()


开始一次


sessionmakerEngine 都具有 Engine.begin() 方法,该方法都将获取一个用于执行 SQL 语句的新对象(SessionConnection),然后返回一个上下文管理器,该管理器将维护该对象的 begin/commit/rollback 上下文。


发动机:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically


会期:

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically


嵌套事务


通过 Session.begin_nested()Connection.begin_nested() 方法,返回的事务对象必须用于提交或回滚 SAVEPOINT。调用 Session.commit()Connection.commit() 方法将始终提交最外层的事务;这是特定于 SQLAlchemy 2.0 的行为,与 1.x 系列相反。


发动机:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback

# commits automatically


会期:

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically


显式开始


Session 具有“autobegin”行为,这意味着一旦作开始发生,它就会确保 SessionTransaction 用于跟踪正在进行的作。 此交易已完成 当 Session.commit() 被调用时。


通常,尤其是在框架集成中,需要控制 “begin”作发生的点。 为了适应这种情况, Session 使用“autobegin”策略,这样 Session.begin() 方法可以直接调用 尚未开始事务的会话

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise


使用上下文管理器可以更惯用地调用上述模式:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"


Session.begin() 方法和会话的 “autobegin” 过程使用相同的步骤序列来开始事务。这包括在 SessionEvents.after_transaction_create() 事件发生时调用事件;框架使用这个钩子来集成他们自己的事务流程和 ORM Session 的事务流程。


启用两阶段提交


对于支持两阶段作的后端(目前是 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库提交事务,以便在所有数据库中提交或回滚事务。你也可以 Session.prepare() 会话,用于与非 SQLAlchemy 管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()


设置事务隔离级别 / DBAPI AUTOCOMMIT


大多数 DBAPI 都支持可配置事务隔离级别的概念。传统上,这些级别是 “READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ” 和 “SERIALIZABLE” 四个级别。这些通常在 DBAPI 连接开始新事务之前应用于 DBAPI 连接,请注意,大多数 DBAPI 将在首次发出 SQL 语句时隐式地开始此事务。


支持隔离级别的 DBAPI 通常也支持 true 的概念 “autocommit”,这意味着 DBAPI 连接本身将被放入 非事务性 autocommit 模式。 这通常意味着典型的 不再自动向数据库发出 “BEGIN” 的 DBAPI 行为 发生,但它也可能包含其他指令。 使用此模式时, DBAPI 在任何情况下都不使用事务。像 .begin()、.commit().rollback() 这样的 SQLAlchemy 方法以静默方式传递。


SQLAlchemy 的方言支持在每个引擎上设置的隔离模式 或每个 Connection 的 Package,在 create_engine() 级别以及 Connection.execution_options() 水平。


当使用 ORM Session 时,它充当引擎和 connections,但不直接公开事务隔离。 所以在 为了影响事务隔离级别,我们需要对 发动机连接(视情况而定)。


另请参阅


设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别在 SQLAlchemy Connection 对象级别的工作原理。


为 Sessionmaker / 引擎范围设置隔离


设置具有特定 隔离级别,第一种技术是 可以根据特定的隔离级别构建引擎 在所有情况下,它都用作 会话和/或 sessionmaker

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)


如果同时有两个具有不同隔离级别的引擎,另一个选项很有用,是使用 Engine.execution_options() 方法,该方法将生成原始 Engine 的浅表副本,该副本与父引擎共享相同的连接池。当作将分为 “transactional” 和 “autocommit”作时,这通常是更可取的:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)


在上面,“eng” 和 “autocommit_engine” 共享相同的方言和连接池。但是,当从 autocommit_engine 获取连接时,将在连接上设置 “AUTOCOMMIT” 模式。 两人 SessionMaker 对象 “transactional_session” 和 “autocommit_session” 然后在使用数据库连接时继承这些特征。


autocommit_session仍然具有事务语义, 包括那个 Session.commit()Session.rollback() 仍然认为自己是 “committing” 和 “rolling back” 对象,但是事务将静默地不存在。因此,它通常是 尽管不是严格要求的,但具有 AUTOCOMMIT 隔离的 Session 是 以只读方式使用,即:

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))

# closes connection


为单个会话设置隔离


当我们创建一个新的 Session 时,无论是直接使用构造函数还是调用 sessionmaker 生成的可调用对象,我们都可以直接传递 bind 参数,覆盖预先存在的 bind。例如,我们可以从默认的 Session 创建 sessionmaker 并传递一个用于 autocommit 的引擎集:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...


对于 Sessionsessionmaker 配置了多个 “binds” 的情况,我们可以重新指定 binding 参数,或者如果我们想只替换特定的绑定,那么我们 可以使用 Session.bind_mapper()Session.bind_table() 方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)


为单个事务设置隔离


关于隔离级别的一个关键警告是,不能在事务已经开始的 Connection 上安全地修改该设置。数据库无法更改正在进行的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在这方面的行为不一致。


因此,最好使用前面的 Session 绑定到具有所需隔离级别的引擎。 然而,孤立 level 可能会受到使用 Session.connection() 方法:

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.


在上面,我们首先使用构造函数sessionmaker 创建器。然后,我们通过调用 Session.connection() 显式设置数据库级事务的开始,它提供了将在数据库级事务开始之前传递给连接的执行选项。事务将继续使用此选定的隔离级别。事务完成后,在将连接返回到连接池之前,连接上的隔离级别将重置为其默认值。


Session.begin() 方法也可用于开始 会话级事务;调用 Session.connection() 可用于设置每个连接事务的隔离级别:

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.


使用事件跟踪事务状态


请参阅 Transaction Events 部分,了解 session 事务状态更改的可用事件钩子的概述。


将 Session 加入外部事务(例如测试套件)


如果正在使用的 Connection 已经处于事务状态(即已建立 Transaction),则 Session 可以 只需绑定 Session 到该 Connection 的 Session。通常的基本原理是一个测试套件,它允许 ORM 代码自由地与 Session 一起工作,包括调用 Session.commit() 的能力,之后整个数据库交互都会回滚。


在 2.0 版更改: “join into an external transaction” 配方在 2.0 中再次进行了新改进;不再需要用于“重置”嵌套事务的事件处理程序。


该配方的工作原理是在 transaction 和可选的 SAVEPOINT,然后将其传递给 Session 作为 “bind”;这 Session.join_transaction_mode 参数通过设置 “create_savepoint” 传递,这表示新的 SAVEPOINT 应该是 created 的 Session,这将使外部事务保持其传递时的相同状态。


当测试关闭时,外部事务将回滚,以便恢复整个测试中的任何数据更改:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()


上述配方是 SQLAlchemy 自己的 CI 的一部分,以确保它保持按预期工作。