异步 I/O (asyncio)


支持 Python asyncio。包括对 Core 和 ORM 使用的支持,使用与 asyncio 兼容的方言。


在 1.4 版本加入.


警告


请阅读 Asyncio 平台安装说明(包括 Apple M1),了解许多平台(包括 Apple M1 架构)的重要平台安装说明。


另请参阅


Core 和 ORM 的异步 IO 支持 - 初始功能公告


Asyncio 集成 - 示例脚本,说明 asyncio 扩展中使用 Core 和 ORM 的工作示例。


Asyncio 平台安装说明(包括 Apple M1)


asyncio 扩展只需要 Python 3。它还取决于 greenlet 库。默认情况下,此依赖项安装在常见的计算机平台上,包括:

x86_64 aarch64 ppc64le amd64 win32


对于上述平台,众所周知,greenlet 会提供预构建的 wheel 文件。对于其他平台,默认情况下不安装 greenlet; Greenlet 的当前文件列表可以在 Greenlet - 下载文件。请注意,省略了许多架构,包括 Apple M1


在确保存在 greenlet 依赖项的同时安装 SQLAlchemy 无论使用哪个平台, [asyncio]setuptools 额外 可以安装 如下所示,其中还将包括指示 pip 安装 Greenlet

pip install sqlalchemy[asyncio]


请注意,在没有预建 wheel 文件的平台上安装 greenlet 意味着 greenlet 将从源代码构建,这需要 Python 的开发库也存在。


概要 - 核心


对于 Core 使用,create_async_engine() 函数会创建一个 AsyncEngine 实例,然后提供传统 Engine API 的异步版本。 这 AsyncEngine 通过其 AsyncEngine.connect()AsyncEngine.begin() 提供 AsyncConnection 方法,它们都提供异步上下文管理器。 这 然后,AsyncConnection 可以使用 AsyncConnection.execute() 方法传递一个缓冲的 ResultAsyncConnection.stream() 方法提供流式处理服务器端 AsyncResult

>>> import asyncio

>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine

>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(meta.drop_all)
...         await conn.run_sync(meta.create_all)
...
...         await conn.execute(
...             t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
...         )
...
...     async with engine.connect() as conn:
...         # select a Result, which will be delivered with buffered
...         # results
...         result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
...         print(result.fetchall())
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE t1 ( name VARCHAR(50) NOT NULL, PRIMARY KEY (name) ) ... INSERT INTO t1 (name) VALUES (?) [...] [('some name 1',), ('some name 2',)] COMMIT BEGIN (implicit) SELECT t1.name FROM t1 WHERE t1.name = ? [...] ('some name 1',) [('some name 1',)] ROLLBACK


上面,AsyncConnection.run_sync() 方法可用于调用不包含可等待钩子的特殊 DDL 函数,例如 MetaData.create_all()。


提示


范围,该范围将脱离上下文并被垃圾回收,如 async_main 函数。这可确保连接池保持打开的任何连接都将在可等待的上下文中正确处理。与使用阻塞 IO 时不同,SQLAlchemy 无法在 __del__ 或 weakref 终结器,因为没有机会调用 await。 当引擎超出范围时未能显式释放引擎 可能会导致向 Standard Out 发出类似于 RuntimeError: Event loop is closed 在垃圾回收中。


AsyncConnection 还通过 AsyncConnection.stream() 方法提供了一个“流式”API,该方法返回一个 AsyncResult 对象。此结果对象使用服务器端游标并提供 async/await API,例如异步迭代器:

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))


概要 - ORM


使用 2.0 样式的查询,AsyncSession 类提供了完整的 ORM 功能。


在默认使用模式下,必须特别注意避免懒惰 加载或其他涉及 ORM 关系和列属性的过期属性访问;下一节 使用 AsyncSession 时防止隐式 IO 对此进行了详细介绍。


警告


AsyncSession 的单个实例对于 在多个并发任务中使用。 查看各部分 将 AsyncSession 与并发任务一起使用会话是否线程安全? 在并发任务中共享 AsyncSession 是否安全?作为背景。


下面的示例说明了一个完整的示例,包括 mapper 和 session 配置:

>>> from __future__ import annotations

>>> import asyncio
>>> import datetime
>>> from typing import List

>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload


>>> class Base(AsyncAttrs, DeclarativeBase):
...     pass

>>> class B(Base):
...     __tablename__ = "b"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
...     data: Mapped[str]

>>> class A(Base):
...     __tablename__ = "a"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     data: Mapped[str]
...     create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
...     bs: Mapped[List[B]] = relationship()

>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
...     async with async_session() as session:
...         async with session.begin():
...             session.add_all(
...                 [
...                     A(bs=[B(data="b1"), B(data="b2")], data="a1"),
...                     A(bs=[], data="a2"),
...                     A(bs=[B(data="b3"), B(data="b4")], data="a3"),
...                 ]
...             )


>>> async def select_and_update_objects(
...     async_session: async_sessionmaker[AsyncSession],
... ) -> None:
...     async with async_session() as session:
...         stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
...         result = await session.execute(stmt)
...
...         for a in result.scalars():
...             print(a, a.data)
...             print(f"created at: {a.create_date}")
...             for b in a.bs:
...                 print(b, b.data)
...
...         result = await session.execute(select(A).order_by(A.id).limit(1))
...
...         a1 = result.scalars().one()
...
...         a1.data = "new data"
...
...         await session.commit()
...
...         # access attribute subsequent to commit; this is what
...         # expire_on_commit=False allows
...         print(a1.data)
...
...         # alternatively, AsyncAttrs may be used to access any attribute
...         # as an awaitable (new in 2.0.13)
...         for b1 in await a1.awaitable_attrs.bs:
...             print(b1, b1.data)


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     # async_sessionmaker: a factory for new AsyncSession objects.
...     # expire_on_commit - don't expire objects after transaction commit
...     async_session = async_sessionmaker(engine, expire_on_commit=False)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(Base.metadata.create_all)
...
...     await insert_objects(async_session)
...     await select_and_update_objects(async_session)
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE a ( id INTEGER NOT NULL, data VARCHAR NOT NULL, create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL, PRIMARY KEY (id) ) ... CREATE TABLE b ( id INTEGER NOT NULL, a_id INTEGER NOT NULL, data VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(a_id) REFERENCES a (id) ) ... COMMIT BEGIN (implicit) INSERT INTO a (data) VALUES (?) RETURNING id, create_date [...] ('a1',) ... INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id [...] (1, 'b2') ... COMMIT BEGIN (implicit) SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id [...] () SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data FROM b WHERE b.a_id IN (?, ?, ?) [...] (1, 2, 3) <A object at ...> a1 created at: ... <B object at ...> b1 <B object at ...> b2 <A object at ...> a2 created at: ... <A object at ...> a3 created at: ... <B object at ...> b3 <B object at ...> b4 SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id LIMIT ? OFFSET ? [...] (1, 0) UPDATE a SET data=? WHERE a.id = ? [...] ('new data', 1) COMMIT new data <B object at ...> b1 <B object at ...> b2


在上面的示例中,AsyncSession 使用可选的 async_sessionmaker 帮助程序进行实例化,该帮助程序为具有固定参数集的新 AsyncSession 对象提供了一个工厂,此处包括将其与针对特定数据库 URL 的 AsyncEngine 相关联。然后它被传递给其他方法,它可以在 Python 异步上下文管理器中使用(即 async with: 语句),以便它自动关闭 区块的结尾;这相当于调用 AsyncSession.close() 方法。


将 AsyncSession 与并发任务一起使用


AsyncSession 对象是一个可变的有状态对象 ,它表示正在进行的单个有状态数据库事务。通过 asyncio 使用并发任务,例如使用 asyncio.gather() 等 API,应该为每个个体使用单独的AsyncSession 任务


请参阅 Session 是否线程安全? 在并发任务中共享 AsyncSession 是否安全?,了解 SessionAsyncSession 的一般说明,以及它们应如何与并发工作负载一起使用。


使用 AsyncSession 时防止隐式 IO


使用传统的 asyncio 时,应用程序需要避免可能发生 IO-on-attribute 访问的任何点。下面列出了可用于帮助实现此目的的技术,其中许多技术在前面的示例中进行了说明。


  • 延迟加载关系、延迟列或表达式或在过期方案中访问的属性可以利用 AsyncAttrs 混合。当这个 mixin 被添加到一个特定的类中,或者更普遍地被添加到 Declarative Base 超类中时,提供了一个访问器AsyncAttrs.awaitable_attrs 它将任何属性作为 awaitable 提供:

    from __future__ import annotations
    
    from typing import List
    
    from sqlalchemy.ext.asyncio import AsyncAttrs
    from sqlalchemy.orm import DeclarativeBase
    from sqlalchemy.orm import Mapped
    from sqlalchemy.orm import relationship
    
    
    class Base(AsyncAttrs, DeclarativeBase):
        pass
    
    
    class A(Base):
        __tablename__ = "a"
    
        # ... rest of mapping ...
    
        bs: Mapped[List[B]] = relationship()
    
    
    class B(Base):
        __tablename__ = "b"
    
        # ... rest of mapping ...


    在不使用预先加载时,在新加载的 A 实例上访问 A.bs 集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO,这在 asyncio 下会失败,因为不允许隐式 IO。要在 asyncio 下直接访问此属性而无需任何先前的加载作,可以通过指示 AsyncAttrs.awaitable_attrs 来将该属性作为 awaitable 访问 前缀:

    a1 = (await session.scalars(select(A))).one()
    for b1 in await a1.awaitable_attrs.bs:
        print(b1)


    AsyncAttrs mixin 在 internal 方法,该方法也被 AsyncSession.run_sync() 方法。


    2.0.13 新版功能.


    另请参阅


    异步属性


  • 通过使用 SQLAlchemy 2.0 的使用此功能,永远不会从中读取集合,只会读取集合 使用显式 SQL 调用进行查询。 查看示例 async_orm_writeonly.py Asyncio 集成部分中,了解与 asyncio 一起使用的只写集合的示例。


    当使用只写集合时,程序的行为很简单,很容易预测集合。但是,缺点是没有任何内置系统可以一次加载许多这些集合,而是需要手动执行。因此,下面的许多项目符号都解决了在 asyncio 中使用传统的 lazy-loaded 关系时的特定技术,这需要更加小心。


  • 如果不使用 AsyncAttrs,则可以使用 lazy=“raise” 声明关系,这样默认情况下它们就不会尝试发出 SQL。为了加载集合,将改用预先加载


  • 最有用的预先加载策略是 selectInload() 急切加载器,在前面的示例中使用了它 A.bs,以便在 await session.execute() 调用:

    stmt = select(A).options(selectinload(A.bs))

  • 在构造新对象时,集合总是被分配一个默认的 空集合,例如上面示例中的 List:

    A(bs=[], data="a2")


    这允许在刷新 A 对象时,上述 A 对象上的 .bs 集合存在且可读;否则,当 A 被刷新时,.bs 将被卸载,并在访问时引发错误。


  • AsyncSession 是使用 Session.expire_on_commit设置为 False,以便我们可以访问 属性 AsyncSession.commit(),就像我们访问属性的末尾的那一行一样:

    # create AsyncSession with expire_on_commit=False
    async_session = AsyncSession(engine, expire_on_commit=False)
    
    # sessionmaker version
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    
    async with async_session() as session:
        result = await session.execute(select(A).order_by(A.id))
    
        a1 = result.scalars().first()
    
        # commit would normally expire all attributes
        await session.commit()
    
        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)


其他指南包括:


  • 中描述的 “dynamic” relationship loader 策略在 默认情况下,动态关系加载器与 asyncio 方法不兼容。 只有在 AsyncSession.run_sync() 方法,如 在 asyncio 下运行同步方法和函数,或者使用其 .statement 属性来获取普通 select:

    user = await session.get(User, 42)
    addresses = (await session.scalars(user.addresses.statement)).all()
    stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
    addresses_filter = (await session.scalars(stmt)).all()


    SQLAlchemy 版本 2.0 中引入的只写技术与 asyncio 完全兼容,应该首选。


    另请参阅


    “Dynamic” 关系加载器被 “Write Only” 取代 - 关于迁移到 2.0 样式的说明


  • 如果将 asyncio 与不支持 RETURNING 的数据库一起使用,例如 MySQL 8 中,服务器默认值(例如生成的时间戳)不会为 在新刷新的对象上可用,除非 Mapper.eager_defaults 选项。在 SQLAlchemy 2.0 中,此行为会自动应用于 PostgreSQL、SQLite 和 MariaDB 等后端,这些后端在 INSERTed 行时使用 RETURNING 获取新值。


在 asyncio 下运行同步方法和函数


深度炼金术


这种方法本质上是公开了 SQLAlchemy 首先能够提供 asyncio 接口的机制。虽然这样做没有技术问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些中心理念,即任何可能导致 IO 被调用的编程语句都必须有一个 await 调用,以免程序没有明确地明确说明 IO 可能发生的每一行。这种方法并没有改变这个一般的想法,只是它允许在函数调用的范围内将一系列同步 IO 指令从此规则中免除,基本上捆绑到一个 awaitable 中。


作为在 asyncio 事件循环中集成传统 SQLAlchemy“延迟加载”的另一种方法,一种称为 提供了 AsyncSession.run_sync() ,它将在 greenlet 中运行任何 Python 函数,其中传统的同步编程概念在到达数据库驱动程序时将被转换为使用 await。这里的一个假设方法是面向 asyncio 的应用程序可以将与数据库相关的方法打包成使用 AsyncSession.run_sync() 调用的函数。


更改上面的例子,如果我们没有使用 selectinload() 对于 A.bs 集合,我们可以在一个单独的函数中完成对这些属性访问的处理:

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine


def fetch_and_update_objects(session):
    """run traditional sync-style ORM code in a function that will be
    invoked within an awaitable.

    """

    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.

    stmt = select(A)

    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)

        # lazy loads
        for b1 in a1.bs:
            print(b1)

    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()

    a1.data = "new data"


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        await session.run_sync(fetch_and_update_objects)

        await session.commit()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())


上述在 “sync” 运行程序中运行某些函数的方法与在基于事件的编程库(如 gevent)上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。区别如下:


  1. 与使用 gevent 时不同,我们可以继续使用标准的 Python asyncio 事件循环或任何自定义事件循环,而无需集成到 gevent 事件循环中。


  2. 没有任何 “monkeypatching”。上面的示例使用了真正的 asyncio 驱动程序,底层 SQLAlchemy 连接池也使用了 Python 内置的 asyncio。池化连接排队。


  3. 该程序可以在 async/await 代码和包含使用 sync 代码的函数之间自由切换,而几乎没有性能损失。没有使用“线程执行程序”或任何其他 Waiter 或同步。


  4. 底层网络驱动程序也使用纯 Python asyncio 概念,没有像 geventeventlet 这样的第三方网络库 提供正在使用中。


通过 asyncio 扩展使用事件


SQLAlchemy 事件系统不是由 asyncio 扩展直接公开的,这意味着还没有 SQLAlchemy 事件处理程序的“异步”版本。


但是,由于 asyncio 扩展围绕着通常的同步 SQLAlchemy API,因此常规的“同步”样式事件处理程序可以免费使用,就像不使用 asyncio 时一样。


如下所述,在给定面向 asyncio 的 API 的情况下,有两种当前策略可以注册事件:


当在 asyncio 上下文中的事件处理程序中工作时,像 Connection 这样的对象继续以它们通常的 “同步” 方式工作,而不需要 awaitasync 使用;当 asyncio 数据库适配器最终收到消息时,调用样式将透明地调整回 asyncio 调用样式。对于传递 DBAPI 级别连接的事件,例如 PoolEvents.connect(),该对象是符合 pep-249 的“连接”对象,它将使同步样式的调用适应 asyncio 驱动程序。


使用异步引擎 / 会话 / Sessionmakers 的事件监听器示例


与面向异步的 API 构造关联的同步样式事件处理程序的一些示例如下所示:


  • AsyncEngine 上的核心事件


    在此示例中,我们将访问 AsyncEngine.sync_engine 属性作为 ConnectionEventsPoolEvents

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.engine import Engine
    from sqlalchemy.ext.asyncio import create_async_engine
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    
    # connect event on instance of Engine
    @event.listens_for(engine.sync_engine, "connect")
    def my_on_connect(dbapi_con, connection_record):
        print("New DBAPI connection:", dbapi_con)
        cursor = dbapi_con.cursor()
    
        # sync style API use for adapted DBAPI connection / cursor
        cursor.execute("select 'execute from event'")
        print(cursor.fetchone()[0])
    
    
    # before_execute event on all Engine instances
    @event.listens_for(Engine, "before_execute")
    def my_before_execute(
        conn,
        clauseelement,
        multiparams,
        params,
        execution_options,
    ):
        print("before execute!")
    
    
    async def go():
        async with engine.connect() as conn:
            await conn.execute(text("select 1"))
        await engine.dispose()
    
    
    asyncio.run(go())


    输出:

    New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
    execute from event
    before execute!

  • AsyncSession 上的 ORM 事件


    在此示例中,我们将 AsyncSession.sync_session 作为 SessionEvents 的目标进行访问:

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.orm import Session
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    session = AsyncSession(engine)
    
    
    # before_commit event on instance of Session
    @event.listens_for(session.sync_session, "before_commit")
    def my_before_commit(session):
        print("before commit!")
    
        # sync style API use on Session
        connection = session.connection()
    
        # sync style API use on Connection
        result = connection.execute(text("select 'execute from event'"))
        print(result.first())
    
    
    # after_commit event on all Session instances
    @event.listens_for(Session, "after_commit")
    def my_after_commit(session):
        print("after commit!")
    
    
    async def go():
        await session.execute(text("select 1"))
        await session.commit()
    
        await session.close()
        await engine.dispose()
    
    
    asyncio.run(go())


    输出:

    before commit!
    execute from event
    after commit!

  • async_sessionmaker 上的 ORM 事件


    对于此用例,我们将 sessionmaker 作为事件目标,然后使用参数 async_sessionmaker.sync_session_class 将其分配给async_sessionmaker

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy.ext.asyncio import async_sessionmaker
    from sqlalchemy.orm import sessionmaker
    
    sync_maker = sessionmaker()
    maker = async_sessionmaker(sync_session_class=sync_maker)
    
    
    @event.listens_for(sync_maker, "before_commit")
    def before_commit(session):
        print("before commit")
    
    
    async def main():
        async_session = maker()
    
        await async_session.commit()
    
    
    asyncio.run(main())


    输出:

    before commit


在连接池和其他事件中使用仅限 awaitable-only 的驱动程序方法


如上一节所述,事件处理程序(例如面向 PoolEvents 事件处理程序的事件处理程序)接收同步样式的“DBAPI”连接,该连接是由 SQLAlchemy asyncio 方言提供的包装器对象,用于将底层 asyncio“驱动程序”连接调整为 SQLAlchemy 内部可以使用的连接。当此类事件处理程序的用户定义实现需要直接使用最终的 “driver” 连接时,会出现一个特殊的用例,在该驱动程序连接上使用仅限 awaitable 的方法。一个这样的例子是 asyncpg 驱动程序提供的 .set_type_codec() 方法。


为了适应此用例,SQLAlchemy 的 AdaptedConnection class 提供了一个方法 AdaptedConnection.run_async() ,该方法允许在事件处理程序或其他 SQLAlchemy 内部的 “同步” 上下文中调用可等待的函数。此方法直接类似于 AsyncConnection.run_sync() 方法,该方法允许 sync-style 方法在 async 下运行。


应该向 AdaptedConnection.run_async() 传递一个函数,该函数将接受最内层的 “driver” 连接作为单个参数,并返回一个将由 AdaptedConnection.run_async() 调用的 awaitable 方法。 给定的函数本身不需要声明为 async;它是 Python lambda: 是完全可以的,因为返回 awaitable 值将在返回后调用:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(...)


@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )


在上面,传递给 register_custom_types 事件处理程序的对象是 AdaptedConnection 的实例,它为底层仅异步驱动程序级连接对象提供类似于 DBAPI 的接口。然后,AdaptedConnection.run_async() 方法提供对可等待环境的访问,在该环境中,可以作用于基础驱动程序级别连接。


在 1.4.30 版本加入.


使用多个 asyncio 事件循环


使用多个事件循环的应用程序,例如在将 asyncio 与多线程相结合的不常见情况下,在使用默认池实现时,不应与不同的事件循环共享同一个 AsyncEngine


如果 AsyncEngine 从一个事件循环传递到另一个事件循环,则应先调用 AsyncEngine.dispose() 方法,然后再将其用于新的事件循环。否则可能会导致 RuntimeError 沿着 Task <Task pending ...> got Future attached to a different loop


如果必须在不同的循环之间共享相同的引擎,则应将其配置为使用 NullPool 禁用池,从而防止引擎多次使用任何连接:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)


使用 asyncio 作用域的会话


线程 SQLAlchemy 中使用的“作用域会话”模式与 scoped_session 对象在 asyncio 中也可用,使用名为 async_scoped_session 的改编版本。


提示


SQLAlchemy 通常不推荐 “scoped” 模式 对于新的开发,因为它依赖于可变的全局状态,该状态也必须 在线程或任务中的工作完成时显式拆除。 特别是在使用 asyncio 时,最好将 AsyncSession 直接传递给需要它的可等待函数。


使用 async_scoped_session 时,因为没有 “thread-local” 概念,则必须将 “scopefunc” 参数提供给 构造函数。以下示例说明了如何使用 asyncio.current_task() 函数来实现此目的:

from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
)

async_session_factory = async_sessionmaker(
    some_async_engine,
    expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=current_task,
)
some_async_session = AsyncScopedSession()


警告


async_scoped_session 使用的 “scopefunc” 在任务中调用任意次数,每次访问底层 AsyncSession 时调用一次。因此,该函数应该是幂等的轻量级的,并且不应该尝试创建或更改任何状态,例如建立回调等。


警告


对作用域中的 “key” 使用 current_task() 需要从最外层的可等待对象中调用 async_scoped_session.remove() 方法,以确保在任务完成时从注册表中删除该 key,否则任务句柄和 AsyncSession 将保留在内存中,本质上会造成内存泄漏。请参阅以下示例,其中说明了 async_scoped_session.remove() 的正确用法。


async_scoped_session 包括代理 的行为类似于 scoped_session,这意味着它可以直接被视为 AsyncSession,请记住,通常的 await 关键字是必需的,包括 async_scoped_session.remove() 方法:

async def some_function(some_async_session, some_object):
    # use the AsyncSession directly
    some_async_session.add(some_object)

    # use the AsyncSession via the context-local proxy
    await AsyncScopedSession.commit()

    # "remove" the current proxied AsyncSession for the local context
    await AsyncScopedSession.remove()


在 1.4.19 版本加入.


使用 Inspector 检查 schema 对象


SQLAlchemy 尚未提供 asyncio 版本的 Inspector(在 Fine Grained Reflection with Inspector 中引入),但是可以通过利用 AsyncConnection.run_sync()AsyncConnection 的 AsyncConnection 中:

import asyncio

from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")


def use_inspector(conn):
    inspector = inspect(conn)
    # use the inspector
    print(inspector.get_view_names())
    # return any value to the caller
    return inspector.get_table_names()


async def async_main():
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)


asyncio.run(async_main())


引擎 API 文档


对象名称

描述


async_engine_from_config(configuration[, prefix], **kwargs)


使用配置字典创建新的 AsyncEngine 实例。


异步连接


Connection 的 asyncio 代理。


异步引擎


Engine 的 asyncio 代理。


异步事务


Transaction 的 asyncio 代理。


create_async_engine(url, **kw)


创建新的异步引擎实例。


create_async_pool_from_url(url, **kwargs)


创建新的异步引擎实例。


函数 sqlalchemy.ext.asyncio 中。create_async_engineurl:strURL, **kw Any AsyncEngine


创建新的异步引擎实例。


传递给 create_async_engine() 的参数与传递给 create_engine() 函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg


在 1.4 版本加入.


参数

async_creator


一个 async callable,它返回一个驱动程序级的 asyncio 连接。如果给定,则该函数不应采用任何参数,并从底层 asyncio 数据库驱动程序返回新的 asyncio 连接;该连接将被包装在适当的结构中,以便与 AsyncEngine 一起使用。请注意,URL 中指定的参数在此处不适用,creator 函数应使用自己的连接参数。


此参数是 asyncio 等效的 create_engine.creator 参数的 create_engine() 函数。


2.0.16 新版功能.


函数 sqlalchemy.ext.asyncio 中。async_engine_from_configconfiguration Dict[str Any]prefix str = 'sqlalchemy.'**kwargs Any AsyncEngine


使用配置字典创建新的 AsyncEngine 实例。


此函数类似于 SQLAlchemy Core 中的 engine_from_config() 函数,不同之处在于请求的方言必须是与 asyncio 兼容的方言,例如 asyncpg。该函数的参数签名与 engine_from_config() 的参数签名相同。


在 1.4.29 版本加入.


函数 sqlalchemy.ext.asyncio 中。create_async_pool_from_urlurl:strURL, **kwargs Any 矿池


创建新的异步引擎实例。


传递给 create_async_pool_from_url() 的参数与传递给 create_pool_from_url() 函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg


在 2.0.10 版本加入.


sqlalchemy.ext.asyncio 中。异步引擎


Engine 的 asyncio 代理。


AsyncEngine 是使用 create_async_engine() 函数:

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncEngine sqlalchemy.ext.asyncio.base.ProxyComparablesqlalchemy.ext.asyncio.AsyncConnectable


方法 sqlalchemy.ext.asyncio.AsyncEngine. begin AsyncIterator[AsyncConnection]


返回一个上下文管理器,输入该上下文管理器后将传递一个 AsyncConnection 替换为 AsyncTransaction 已建立。


例如:

async with async_engine.begin() as conn:
    await conn.execute(
        text("insert into table (x, y, z) values (1, 2, 3)")
    )
    await conn.execute(text("my_special_procedure(5)"))

方法 sqlalchemy.ext.asyncio.AsyncEngine. clear_compiled_cache


清除与方言关联的已编译缓存。


代表 AsyncEngine 类代理 Engine 类。


仅适用于通过 create_engine.query_cache_size parameter 建立的内置缓存。 它不会影响通过 Connection.execution_options.compiled_cache 参数。


在 1.4 版本加入.


方法 sqlalchemy.ext.asyncio.AsyncEngine. connect AsyncConnection


返回 AsyncConnection 对象。


AsyncConnection 作为异步上下文管理器输入时,它将从底层连接池中获取数据库连接:

async with async_engine.connect() as conn:
    result = await conn.execute(select(user_table))


AsyncConnection 也可以通过调用其 AsyncConnection.start() 在上下文管理器之外启动 方法。


attribute dialect(属性 sqlalchemy.ext.asyncio.AsyncEngine. 方言)¶


代表 AsyncEngine 类的 Engine.dialect 属性的代理。


方法 sqlalchemy.ext.asyncio.AsyncEngine. async disposeclose bool = True None


释放 this 使用的连接池 AsyncEngine 的 AsyncEngine 中。


参数


关闭


如果保留为默认值 True,则具有完全关闭当前签入的所有 数据库连接。 仍处于签出状态的连接 不会关闭,但它们将不再与此引擎相关联, 因此,当它们单独关闭时,最终 与他们关联的游泳池将被垃圾回收,如果在入住时尚未关闭,它们将被完全关闭。


如果设置为 False,则取消引用前一个连接池,否则不会以任何方式触及。


另请参阅


Engine.dispose()


属性 sqlalchemy.ext.asyncio.AsyncEngine. 驱动


方言的驱动程序名称 由此引擎使用。


代表 AsyncEngine 类代理 Engine 类。


attribute sqlalchemy.ext.asyncio.AsyncEngine. echo 属性¶


如果为 True,则为此元素启用日志输出。


代表 AsyncEngine 类代理 Engine 类。


这具有为此元素的类和对象引用的命名空间设置 Python 日志记录级别的效果。布尔值 True 表示将为记录器设置 loglevel logging.INFO,而字符串值 debug 会将 loglevel 设置为 伐木。DEBUG的 DEBUG 中。


属性 sqlalchemy.ext.asyncio.AsyncEngine. 引擎


返回此 Engine


代表 AsyncEngine 类代理 Engine 类。


用于接受 Connection / Engine 对象。


方法 sqlalchemy.ext.asyncio.AsyncEngine. execution_options**opt Any AsyncEngine


返回一个新的 AsyncEngine,它将提供 AsyncConnection 对象。


Engine.execution_options() 代理。有关详细信息,请参阅该方法。


方法 sqlalchemy.ext.asyncio.AsyncEngine. get_execution_options _ExecuteOptions


获取将在执行过程中生效的非 SQL 选项。


代表 AsyncEngine 类代理 Engine 类。


属性 sqlalchemy.ext.asyncio.AsyncEngine. 名称


方言的字符串名称 由此引擎使用。


代表 AsyncEngine 类代理 Engine 类。


属性 sqlalchemy.ext.asyncio.AsyncEngine.


代表 AsyncEngine 类的 Engine.pool 属性的代理。


方法 sqlalchemy.ext.asyncio.AsyncEngine. async raw_connection PoolProxiedConnection


从连接池中返回 “原始” DBAPI 连接。


属性 sqlalchemy.ext.asyncio.AsyncEngine. sync_engine: 引擎


引用 sync-style Engine this AsyncEngine 代理请求。


此实例可用作事件目标。


method sqlalchemy.ext.asyncio.AsyncEngine. update_execution_options**opt Any


更新此 Engine 的默认 execution_options 字典。


代表 AsyncEngine 类代理 Engine 类。


opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options 参数发送到 create_engine()


属性 sqlalchemy.ext.asyncio.AsyncEngine. url


代表 AsyncEngine 类的 Engine.url 属性的代理。


sqlalchemy.ext.asyncio 中。异步连接


Connection 的 asyncio 代理。


AsyncConnection 是使用 异步引擎.connect() AsyncEngine 的方法:

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

async with engine.connect() as conn:
    result = await conn.execute(select(table))


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncConnection sqlalchemy.ext.asyncio.base.ProxyComparablesqlalchemy.ext.asyncio.base.StartableContextsqlalchemy.ext.asyncio.AsyncConnectable


方法 sqlalchemy.ext.asyncio.AsyncConnection. async aclose None


AsyncConnection.close() 的同义词。


AsyncConnection.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing Context Manager 函数。


2.0.20 版本的新Function。


方法 sqlalchemy.ext.asyncio.AsyncConnection. begin AsyncTransaction


在 autostart 发生之前开始事务。


方法 sqlalchemy.ext.asyncio.AsyncConnection. begin_nested AsyncTransaction


开始嵌套事务并返回事务句柄。


method sqlalchemy.ext.asyncio.AsyncConnection. async close None


关闭此 AsyncConnection


如果有一个事务,这还具有回滚事务的效果。


Attribute sqlalchemy.ext.asyncio.AsyncConnection. closed 属性¶


如果此连接已关闭,则返回 True。


代表 AsyncConnection 类为 Connection 类代理。


方法 sqlalchemy.ext.asyncio.AsyncConnection. async commit None


提交当前正在进行的事务。


如果已启动事务,则此方法提交当前事务。如果未启动事务,则该方法无效,假设连接处于非无效状态。


事务会自动在 Connection 上开始 每当首次执行语句时,或者当 Connection.begin() 方法。


属性 sqlalchemy.ext.asyncio.AsyncConnection. 连接


未为 async 实现;叫 AsyncConnection.get_raw_connection()


属性 sqlalchemy.ext.asyncio.AsyncConnection. default_isolation_level


与 使用中的方言


代表 AsyncConnection 类为 Connection 类代理。


此值独立于 Connection.execution_options.isolation_level Engine.execution_options.isolation_level 执行选项,并由 Dialect 在创建第一个连接时确定,方法是在发出任何其他命令之前对当前隔离级别的数据库执行 SQL 查询。


调用此访问器不会调用任何新的 SQL 查询。


另请参阅


Connection.get_isolation_level() - 查看当前实际隔离级别


create_engine.isolation_level - 按引擎隔离级别设置


Connection.execution_options.isolation_level - 按连接隔离级别设置


attribute dialect(属性 sqlalchemy.ext.asyncio.AsyncConnection. 方言)¶


代表 AsyncConnection 类的 Connection.dialect 属性的代理。


方法 sqlalchemy.ext.asyncio.AsyncConnection. async exec_driver_sqlstatement str, parameters:_DBAPIAnyExecuteParamsNone=None, execution_options:CoreExecuteOptionsParameterNone=None) CursorResult[Any]


执行驱动程序级 SQL 字符串并返回缓冲 结果


method sqlalchemy.ext.asyncio.AsyncConnection. async executestatement Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None) CursorResult[Any]


执行 SQL 语句结构并返回缓冲的 结果


参数

  • 对象


    要执行的语句。这始终是一个对象,它同时位于 ClauseElement可执行层次结构,包括:


  • parameters —— 将绑定到语句中的参数。这可以是参数名称到值的字典,也可以是字典的可变序列(例如列表)。当传递字典列表时,底层语句执行将使用 DBAPI cursor.executemany() 方法。当传递单个字典时,DBAPI cursor.execute() 方法。


  • execution_options – 可选的执行选项字典,将与语句执行相关联。此词典可以提供 接受的选项的子集 Connection.execution_options()


结果


Result 对象。


方法 sqlalchemy.ext.asyncio.AsyncConnection. async execution_options**opt Any AsyncConnection


为连接设置非 SQL 选项,这些选项在执行过程中生效。


这将返回此 AsyncConnection 对象,并添加了新选项。


有关此方法的完整详细信息,请参阅 Connection.execution_options()


方法 sqlalchemy.ext.asyncio.AsyncConnection. get_nested_transaction→AsyncTransactionNone


返回表示当前嵌套 (savepoint) 事务(如果有)的 AsyncTransaction


这利用了底层同步连接的 Connection.get_nested_transaction() 方法获取当前 Transaction,然后在新的 AsyncTransaction 对象。


1.4.0b2 版本的新Function。


方法 sqlalchemy.ext.asyncio.AsyncConnection. async get_raw_connection PoolProxiedConnection


返回此正在使用的共用 DBAPI 级连接 AsyncConnection 的 AsyncConnection 中。


这是一个 SQLAlchemy 连接池代理连接 该 API 具有属性 _ConnectionFairy.driver_connection ,指的是 实际驾驶员连接。其 _ConnectionFairy.dbapi_connection 而是引用使驱动程序连接适应 DBAPI 协议的 AdaptedConnection 实例。


方法 sqlalchemy.ext.asyncio.AsyncConnection. get_transaction→AsyncTransactionNone


返回表示当前事务的 AsyncTransaction(如果有)。


这利用了底层同步连接的 Connection.get_transaction() 方法获取当前 Transaction,然后在新的 AsyncTransaction 对象。


1.4.0b2 版本的新Function。


方法 sqlalchemy.ext.asyncio.AsyncConnection. in_nested_transaction bool


如果事务正在进行,则返回 True。


1.4.0b2 版本的新Function。


方法 sqlalchemy.ext.asyncio.AsyncConnection. in_transaction bool


如果事务正在进行,则返回 True。


属性 sqlalchemy.ext.asyncio.AsyncConnection. 信息


返回底层 ConnectionConnection.info 字典。


此字典可自由写入,以便用户定义的状态与数据库连接相关联。


仅当 AsyncConnection 当前已连接时,此属性才可用。如果 AsyncConnection.closed 属性为 True,则访问此属性将引发 ResourceClosedError 的 ResourceClosedError 错误。


1.4.0b2 版本的新Function。


method sqlalchemy.ext.asyncio.AsyncConnection. async invalidateexception:BaseExceptionNone=None None


使与此 Connection 关联的基础 DBAPI 连接失效。


有关此方法的完整详细信息,请参阅方法 Connection.invalidate()


attribute invalidated(属性 sqlalchemy.ext.asyncio.AsyncConnection. 无效)¶


如果此连接无效,则返回 True。


代表 AsyncConnection 类为 Connection 类代理。


但是,这并不表示连接是否在池级别失效


方法 sqlalchemy.ext.asyncio.AsyncConnection. async rollback None


回滚当前正在进行的事务。


如果已启动事务,则此方法回滚当前事务。如果未启动事务,则该方法无效。如果事务已启动并且连接处于无效状态,则使用此方法清除事务。


事务会自动在 Connection 上开始 每当首次执行语句时,或者当 Connection.begin() 方法。


方法 sqlalchemy.ext.asyncio.AsyncConnection. async run_syncfn: ~typing.Callable[[~typing.连接[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P _T


调用给定的 synchronous (i. not async) 可调用对象,传递一个 synchronous 样式的 Connection 作为第一个参数。


此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。


例如:

def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
    """A synchronous function that does not require awaiting

    :param conn: a Core SQLAlchemy Connection, used synchronously

    :return: an optional return value is supported

    """
    conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2))
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    """an async function that uses awaiting"""

    async with async_engine.begin() as async_conn:
        # run do_something_with_core() with a sync-style
        # Connection, proxied into an awaitable
        return_code = await async_conn.run_sync(
            do_something_with_core, 5, "strval"
        )
        print(return_code)


此方法通过在专门检测的 greenlet 中运行给定的可调用对象来维护 asyncio 事件循环,直到数据库连接。


AsyncConnection.run_sync() 最基本的用途是调用 MetaData.create_all() 等方法,给定需要提供给 MetaData.create_all() 作为连接 对象:

# run metadata.create_all(conn) with a sync-style Connection,
# proxied into an awaitable
with async_engine.begin() as conn:
    await conn.run_sync(metadata.create_all)


注意


提供的可调用对象在 asyncio 事件循环中内联调用,并将在传统的 IO 调用上阻塞。此可调用对象中的 IO 应仅调用 SQLAlchemy 的 asyncio 数据库 API,该 API 将适当适应 greenlet 上下文。


method sqlalchemy.ext.asyncio.AsyncConnection. async scalarstatement Executable, parameters:_CoreSingleExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)→ Any(语句: 可执行, parameters:=None


执行 SQL 语句构造并返回标量对象。


此方法是调用 Result.scalar() 方法。 Connection.execute() 方法。参数是等效的。


结果


一个标量 Python 值,表示返回的第一行的第一列。


method sqlalchemy.ext.asyncio.AsyncConnection. async scalarsstatement Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None ScalarResult[Any]


执行 SQL 语句构造并返回标量对象。


此方法是调用 Result.scalars() 方法。 Connection.execute() 方法。参数是等效的。


结果


ScalarResult 对象。


在 1.4.24 版本加入.


方法 sqlalchemy.ext.asyncio.AsyncConnection. async startis_ctxmanager: bool = False AsyncConnection


在使用 Python with: 块之外启动此 AsyncConnection 对象的上下文。


method sqlalchemy.ext.asyncio.AsyncConnection. streamstatement Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None AsyncIterator[AsyncResult[Any]]


执行一条语句并返回一个可等待的,从而产生一个 AsyncResult 对象。


例如:

result = await conn.stream(stmt)
async for row in result:
    print(f"{row}")


AsyncConnection.stream() 方法支持对 AsyncResult 对象,如下所示:

async with conn.stream(stmt) as result:
    async for row in result:
        print(f"{row}")


在上述模式中,AsyncResult.close() 方法是无条件调用的,即使迭代器被异常抛出中断。但是,上下文管理器使用仍然是可选的,并且可以使用 fn()async调用该函数:或 await fn() 样式。


2.0.0b3 版本中的新功能:添加了上下文管理器支持


结果


一个 awaitable 对象,它将产生一个 AsyncResult 对象。


方法 sqlalchemy.ext.asyncio.AsyncConnection. stream_scalarsstatement Executable, parameters:_CoreSingleExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None AsyncIterator[AsyncScalarResult[任意]]


执行一条语句并返回一个可等待的,从而产生一个 AsyncScalarResult 对象。


例如:

result = await conn.stream_scalars(stmt)
async for scalar in result:
    print(f"{scalar}")


此方法是调用 AsyncResult.scalars() 方法。 Connection.stream() 方法。参数是等效的。


AsyncConnection.stream_scalars() 方法支持对 AsyncScalarResult 对象,如下所示:

async with conn.stream_scalars(stmt) as result:
    async for scalar in result:
        print(f"{scalar}")


在上述模式中,即使迭代器被异常抛出中断,也会无条件地调用 AsyncScalarResult.close() 方法。但是,上下文管理器使用仍然是可选的,并且可以使用 fn()async调用该函数:或 await fn() 样式。


2.0.0b3 版本中的新功能:添加了上下文管理器支持


结果


一个 awaitable 对象,它将产生一个 AsyncScalarResult 对象。


在 1.4.24 版本加入.


属性 sqlalchemy.ext.asyncio.AsyncConnection. sync_connection:ConnectionNone


对 sync-style Connection this 的引用 AsyncConnection 代理请求。


此实例可用作事件目标。


属性 sqlalchemy.ext.asyncio.AsyncConnection. sync_engine: 引擎


引用 sync-style Engine this AsyncConnection 通过其底层 连接


此实例可用作事件目标。


sqlalchemy.ext.asyncio 中。异步交易


Transaction 的 asyncio 代理。


类签名


sqlalchemy.ext.asyncio.AsyncTransaction sqlalchemy.ext.asyncio.base.ProxyComparablesqlalchemy.ext.asyncio.base.StartableContext


method sqlalchemy.ext.asyncio.AsyncTransaction. async close None


关闭此 AsyncTransaction


如果此事务是 begin/commit 嵌套中的基本事务,则该事务将 rollback() 。否则,该方法返回。


这用于取消 Transaction 而不影响封闭事务的范围。


方法 sqlalchemy.ext.asyncio.AsyncTransaction. async commit None


提交此 AsyncTransaction


方法 sqlalchemy.ext.asyncio.AsyncTransaction. async rollback None


回滚此 AsyncTransaction


方法 sqlalchemy.ext.asyncio.AsyncTransaction. async startis_ctxmanager: bool = False AsyncTransaction


在使用 Python 的 with: 块之外启动此 AsyncTransaction 对象的上下文。


结果集 API 文档


AsyncResult 对象是 Result 对象。 仅当使用 AsyncConnection.stream()AsyncSession.stream() 方法,该方法返回位于活动数据库之上的 Result 对象 光标。


对象名称

描述


AsyncMappingResult (异步映射结果)


AsyncResult 的包装器,它返回字典值而不是 Row 值。


异步结果


一个围绕 Result 对象的 asyncio 包装器。


异步标量结果


返回标量值而不是 Row 值的 AsyncResult 的包装器。

AsyncTupleResult


一个类型化为返回纯 Python 元组而不是行的 AsyncResult


sqlalchemy.ext.asyncio 中。异步结果


一个围绕 Result 对象的 asyncio 包装器。


AsyncResult 仅适用于 使用服务器端游标。 它仅从 AsyncConnection.stream()AsyncSession.stream() 方法。


注意


Result 一样,此对象用于 AsyncSession.execute() 返回的 ORM 结果,它可以单独或在类似 Tuples 的行中生成 ORM 映射对象的实例。请注意,这些结果对象不会像旧版 Query 对象那样自动删除重复的实例或行。对于 Python 中实例或行的重复数据删除,请使用 AsyncResult.unique() 修饰符方法。


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncResult sqlalchemy.engine._WithKeyssqlalchemy.ext.asyncio.AsyncCommon


method sqlalchemy.ext.asyncio.AsyncResult. async all Sequence[Row[_TP]]


返回列表中的所有行。


调用后关闭结果集。后续调用将返回一个空列表。


结果


Row 对象的列表。


method sqlalchemy.ext.asyncio.AsyncResult. async close None


继承自AsyncCommonAsyncCommon.close() 方法


关闭此结果。


Attribute sqlalchemy.ext.asyncio.AsyncResult. closed 属性¶


继承自AsyncCommonAsyncCommon.closed属性


代理基础 result 对象的 .closed 属性(如果有),否则会引发 AttributeError


2.0.0b3 版本的新Function。


method sqlalchemy.ext.asyncio.AsyncResult. columns*col_expressions _KeyIndexType Self


建立应在每行中返回的列。


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.columns()


方法 sqlalchemy.ext.asyncio.AsyncResult. async fetchall Sequence[Row[_TP]]


AsyncResult.all() 方法的同义词。


2.0 版的新Function。


method sqlalchemy.ext.asyncio.AsyncResult. async fetchmanysize:intNone=None sequence[Row[_TP]


获取许多行。


当所有行都用完时,返回一个空列表。


提供此方法是为了向后兼容 SQLAlchemy 1.x.x。


要按组获取行,请使用 AsyncResult.partitions() 方法。


结果


Row 对象的列表。


method sqlalchemy.ext.asyncio.AsyncResult. async fetchone→Row[_TP]None¶


获取一行。


当所有行都用完时,返回 None。


提供此方法是为了向后兼容 SQLAlchemy 1.x.x。


要仅获取结果的第一行,请使用 AsyncResult.first() 方法。要遍历所有行,请直接遍历 AsyncResult 对象。


结果


Row 对象(如果未应用过滤器),如果没有剩余行,则为 None


method sqlalchemy.ext.asyncio.AsyncResult. async first→Row[_TP]None


获取第一行,如果不存在行,则为 None


关闭结果集并丢弃剩余行。


注意


默认情况下,此方法返回一行,例如 tuple。 要只返回一个标量值,即第一个 列,请使用 AsyncResult.scalar() 方法,或者将 AsyncResult.scalars()AsyncResult.first() 的 AsyncResult.first() 中。


此外,与传统 ORM 的行为相比 Query.first() 方法,则对 调用的 SQL 查询来生成此 异步结果;对于在生成行之前在内存中缓冲结果的 DBAPI 驱动程序,所有行都将发送到 Python 进程,并且除第一行之外的所有行都将被丢弃。


结果


一个 Row 对象,如果没有剩余行,则为 None。


方法 sqlalchemy.ext.asyncio.AsyncResult. async freeze FrozenResult[_TP]


返回一个可调用对象,该对象将生成此 AsyncResult 调用时。


返回的可调用对象是 FrozenResult 的 Froz。


这用于结果集缓存。当结果未使用时,必须在结果上调用该方法,并且调用该方法将完全使用结果。当 FrozenResult 从缓存中检索,则可以调用任意次数,其中 它每次都会针对其存储的行集生成一个新的 Result 对象。


另请参阅


Re-Executing Statements - 在 ORM 中实现结果集缓存的示例用法。


方法 sqlalchemy.ext.asyncio.AsyncResult. keys RMKeyView


继承自 sqlalchemy.engine._WithKeys.keys sqlalchemy.engine._WithKeys 的方法


返回一个可迭代视图,该视图产生将由每个 Row 表示的字符串键。


键可以表示 core 语句返回的列的标签或 orm 执行返回的 orm 类的名称。


还可以使用 Python 测试视图的密钥包含 in 运算符,该运算符将测试视图中表示的字符串键以及备用键(如 Column 对象)。


在 1.4 版本发生变更: 返回一个键视图对象,而不是一个普通的列表。


方法 sqlalchemy.ext.asyncio.AsyncResult. mappings AsyncMappingResult


将 mappings 筛选器应用于返回的行,返回 AsyncMappingResult 的 AsyncMappingResult 中。


应用此过滤器后,fetching rows 将返回 RowMapping 对象而不是 Row 对象。


结果


引用底层 Result 对象的新 AsyncMappingResult 筛选对象。


method sqlalchemy.ext.asyncio.AsyncResult. async one Row[_TP]


只返回一行或引发异常。


如果结果不返回任何行,则引发 NoResultFound,如果返回多行,则引发 MultipleResultsFound


注意


默认情况下,此方法返回一行,例如 tuple。 要只返回一个标量值,即第一个 列,请使用 AsyncResult.scalar_one() 方法,或者 combine AsyncResult.scalars()AsyncResult.one() 的 AsyncResult.one() 中。


在 1.4 版本加入.


结果


第一


提升


MultipleResultsFoundNoResultFound


method sqlalchemy.ext.asyncio.AsyncResult. async one_or_none→Row[_TP]None


最多返回一个结果或引发异常。


如果结果没有行,则返回 None。引发 MultipleResultsFound 如果返回多行。


在 1.4 版本加入.


结果


第一None(无行)如果没有可用行。


提升


找到多个结果MultipleResultsFound


方法 sqlalchemy.ext.asyncio.AsyncResult. async partitionssize:intNone=None AsyncIterator[Sequence[Row[_TP]]


遍历给定大小的行的子列表。


返回一个异步迭代器:

async def scroll_results(connection):
    result = await connection.stream(select(users_table))

    async for partition in result.partitions(100):
        print("list of rows: %s" % partition)


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.partitions()


方法 sqlalchemy.ext.asyncio.AsyncResult. async scalar Any


获取第一行的第一列,然后关闭结果集。


如果没有要提取的行,则返回 None


不执行验证来测试是否保留其他行。


调用该方法后,对象完全关闭,例如 CursorResult.close() 方法。


结果


Python 标量值,如果没有剩余行,则为 None


method sqlalchemy.ext.asyncio.AsyncResult. async scalar_one Any


只返回一个标量结果或引发异常。


这相当于先调用 AsyncResult.scalars(),然后再调用 AsyncScalarResult.one()。


方法 sqlalchemy.ext.asyncio.AsyncResult. async scalar_one_or_none→AnyNone


只返回一个标量结果或 None


这相当于调用 AsyncResult.scalars(),然后 AsyncScalarResult.one_or_none() 调用 .


方法 sqlalchemy.ext.asyncio.AsyncResult. scalarsindex _KeyIndexType = 0 AsyncScalarResult[Any]


返回一个 AsyncScalarResult 筛选对象,该对象将返回单个元素而不是 Row 对象。


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.scalars()。


参数


index—— 整数或行键,表示要从每行中获取的列,默认为 0 表示第一列。


结果


引用此 AsyncResult 对象的新 AsyncScalarResult 筛选对象。


属性 sqlalchemy.ext.asyncio.AsyncResult. t


将 “typed tuple” 键入筛选器应用于返回的行。


AsyncResult.t 属性是调用 AsyncResult.tuples() 方法的同义词。


2.0 版的新Function。


方法 sqlalchemy.ext.asyncio.AsyncResult. tuples AsyncTupleResult[_TP]


将 “typed tuple” 键入筛选器应用于返回的行。


此方法在运行时返回相同的 AsyncResult 对象,但批注为返回 AsyncTupleResult 对象,它将向 PEP 484 指示纯类型 返回 Tuples 实例而不是 rows。这允许元组解包和__getitem__ Row 访问 对象,对于调用了 本身包括键入信息。


2.0 版的新Function。


结果


键入时的 AsyncTupleResult 类型。


另请参阅


AsyncResult.t - 较短的同义词


Row.t - 版本


method sqlalchemy.ext.asyncio.AsyncResult. uniquestrategy:_UniqueFilterTypeNone=None Self( 自身


将唯一筛选应用于此返回的对象 AsyncResult 的 AsyncResult 中。


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.unique()


方法 sqlalchemy.ext.asyncio.AsyncResult. yield_pernum int Self


配置行获取策略以一次获取 num 行。


FilterResult.yield_per() 方法是 Result.yield_per() 方法的传递。有关使用说明,请参阅该方法的文档。


1.4.40 版本中的新功能: - 添加了 FilterResult.yield_per() ,以便该方法可用于所有结果集实现


sqlalchemy.ext.asyncio 中。AsyncScalarResult(异步标量结果)¶


返回标量值而不是 Row 值的 AsyncResult 的包装器。


AsyncScalarResult 对象是通过调用 AsyncResult.scalars() 方法。


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 ScalarResult 对象。


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncScalarResult sqlalchemy.ext.asyncio.AsyncCommon


method sqlalchemy.ext.asyncio.AsyncScalarResult. async all 序列[_R]


返回列表中的所有标量值。


等效于 AsyncResult.all(),但返回的是标量值,而不是 Row 对象。


method sqlalchemy.ext.asyncio.AsyncScalarResult. async close None


继承自AsyncCommonAsyncCommon.close() 方法


关闭此结果。


Attribute sqlalchemy.ext.asyncio.AsyncScalarResult. closed 属性¶


继承自AsyncCommonAsyncCommon.closed属性


代理基础 result 对象的 .closed 属性(如果有),否则会引发 AttributeError


2.0.0b3 版本的新Function。


method sqlalchemy.ext.asyncio.AsyncScalarResult. async fetchall 序列[_R]


AsyncScalarResult.all() 方法的同义词。


method sqlalchemy.ext.asyncio.AsyncScalarResult. async fetchmanysize:intNone=None Sequence[_R]


获取多个对象。


等效于 AsyncResult.fetchmany(),但返回的是标量值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncScalarResult. async first→_RNone


获取第一个对象,如果不存在对象,则为 None


等效于 AsyncResult.first(),但返回的是标量值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncScalarResult. async one _R


只返回一个对象或引发异常。


等效于 AsyncResult.one(),但返回的是标量值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncScalarResult. async one_or_none→_RNone¶


最多返回一个对象或引发异常。


等效于 AsyncResult.one_or_none() 不同之处在于返回标量值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncScalarResult. async partitionssize:intNone=None AsyncIterator[Sequence[_R]


遍历给定大小的元素的子列表。


等效于 AsyncResult.partitions(),但返回的是标量值,而不是 Row 对象。


method sqlalchemy.ext.asyncio.AsyncScalarResult. uniquestrategy:_UniqueFilterTypeNone=None Self( 自身


将唯一筛选应用于此返回的对象 AsyncScalarResult 的 AsyncScalarResult 中。


有关使用详情,请参阅 AsyncResult.unique()。


方法 sqlalchemy.ext.asyncio.AsyncScalarResult. yield_pernum int Self


配置行获取策略以一次获取 num 行。


FilterResult.yield_per() 方法是 Result.yield_per() 方法的传递。有关使用说明,请参阅该方法的文档。


1.4.40 版本中的新功能: - 添加了 FilterResult.yield_per() ,以便该方法可用于所有结果集实现


sqlalchemy.ext.asyncio 中。AsyncMappingResult(异步映射结果)¶


AsyncResult 的包装器,它返回字典值而不是 Row 值。


AsyncMappingResult 对象是通过调用 AsyncResult.mappings() 方法。


有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 MappingResult 对象。


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncMappingResult sqlalchemy.engine._WithKeyssqlalchemy.ext.asyncio.AsyncCommon


method sqlalchemy.ext.asyncio.AsyncMappingResult. async all Sequence[RowMapping]


返回列表中的所有行。


等效于 AsyncResult.all(),不同之处在于 RowMapping 值,而不是 Row 对象。


method sqlalchemy.ext.asyncio.AsyncMappingResult. async close None


继承自AsyncCommonAsyncCommon.close() 方法


关闭此结果。


Attribute sqlalchemy.ext.asyncio.AsyncMappingResult. closed 属性¶


继承自AsyncCommonAsyncCommon.closed属性


代理基础 result 对象的 .closed 属性(如果有),否则会引发 AttributeError


2.0.0b3 版本的新Function。


method sqlalchemy.ext.asyncio.AsyncMappingResult. columns*col_expressions _KeyIndexType Self


建立应在每行中返回的列。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. async fetchall Sequence[RowMapping]


AsyncMappingResult.all() 方法的同义词。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. async fetchmanysize:intNone=None Sequence[RowMapping]


获取许多行。


等效于 AsyncResult.fetchmany(),不同之处在于 RowMapping 值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. async fetchone→RowMappingNone


获取一个对象。


等效于 AsyncResult.fetchone(),不同之处在于 RowMapping 值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. async first→RowMappingNone


获取第一个对象,如果不存在对象,则为 None


等效于 AsyncResult.first(),不同之处在于 RowMapping 值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. keys RMKeyView


继承自 sqlalchemy.engine._WithKeys.keys sqlalchemy.engine._WithKeys 的方法


返回一个可迭代视图,该视图产生将由每个 Row 表示的字符串键。


键可以表示 core 语句返回的列的标签或 orm 执行返回的 orm 类的名称。


还可以使用 Python 测试视图的密钥包含 in 运算符,该运算符将测试视图中表示的字符串键以及备用键(如 Column 对象)。


在 1.4 版本发生变更: 返回一个键视图对象,而不是一个普通的列表。


method sqlalchemy.ext.asyncio.AsyncMappingResult. async one RowMapping


只返回一个对象或引发异常。


等效于 AsyncResult.one(),不同之处在于 RowMapping 值,而不是 Row 对象。


method sqlalchemy.ext.asyncio.AsyncMappingResult. async one_or_none→RowMappingNone


最多返回一个对象或引发异常。


等效于 AsyncResult.one_or_none() 不同之处在于 RowMapping 值,而不是 Row 对象。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. async partitionssize:intNone=None AsyncIterator[Sequence[RowMapping]]


遍历给定大小的元素的子列表。


等效于 AsyncResult.partitions(),不同之处在于 RowMapping 值,而不是 Row 对象。


method sqlalchemy.ext.asyncio.AsyncMappingResult. uniquestrategy:_UniqueFilterTypeNone=None Self( 自身


将唯一筛选应用于此返回的对象 AsyncMappingResult 的 AsyncMappingResult 中。


有关使用详情,请参阅 AsyncResult.unique()。


方法 sqlalchemy.ext.asyncio.AsyncMappingResult. yield_pernum int Self


配置行获取策略以一次获取 num 行。


FilterResult.yield_per() 方法是 Result.yield_per() 方法的传递。有关使用说明,请参阅该方法的文档。


1.4.40 版本中的新功能: - 添加了 FilterResult.yield_per() ,以便该方法可用于所有结果集实现


sqlalchemy.ext.asyncio 中。AsyncTupleResult(异步元组结果)¶


一个类型化为返回纯 Python 元组而不是行的 AsyncResult


由于 Row 在各个方面都已经像一个元组,所以这个类是一个只打字的类,所以在运行时仍然使用常规的 AsyncResult


类签名


sqlalchemy.ext.asyncio.AsyncTupleResult sqlalchemy.ext.asyncio.AsyncCommonsqlalchemy.util.langhelpers.TypingOnly


ORM Session API 文档


对象名称

描述


async_object_session(实例)


返回给定实例所属的 AsyncSession

async_scoped_session


提供 AsyncSession 对象的范围管理。


async_session(会话)


返回代理给定 Session 对象(如果有)。

async_sessionmaker


可配置的 AsyncSession 工厂。


异步属性


Mixin 类,该类为所有属性提供可等待的访问器。


异步会话


Asyncio 版本的 Session 的 Session。

AsyncSessionTransaction


ORM SessionTransaction 对象的包装器。


close_all_sessions()


关闭所有 AsyncSession 会话。


函数 sqlalchemy.ext.asyncio 中。async_object_sessioninstance object→AsyncSessionNone


返回给定实例所属的 AsyncSession


此函数使用 sync-API 函数 object_session 检索 Session 时,该 Session 引用给定的实例,并从那里将其链接到原始实例 AsyncSession 的 AsyncSession 中。


如果 AsyncSession 已被垃圾回收,则返回值为 None


此功能也可从 InstanceState.async_session 访问器。


参数


instance —— 一个 ORM 映射的实例


结果


一个 AsyncSession 对象,或者 None


在 1.4.18 版本加入.


函数 sqlalchemy.ext.asyncio 中。async_sessionsession Session→AsyncSessionNone


返回代理给定 Session 对象(如果有)。


参数


session—— 一个 Session 实例。


结果


一个 AsyncSession 实例,或者 None


在 1.4.18 版本加入.


函数 async sqlalchemy.ext.asyncio 中。close_all_sessions


关闭所有 AsyncSession 会话。


2.0.23 新版功能.


另请参阅


close_all_sessions()


sqlalchemy.ext.asyncio 中。async_sessionmaker


可配置的 AsyncSession 工厂。


async_sessionmaker Factory 的工作方式与 sessionmaker 工厂,用于生成新的 AsyncSession 对象时,创建它们 Given 在此处建立的配置参数。


例如:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker


async def run_some_sql(
    async_session: async_sessionmaker[AsyncSession],
) -> None:
    async with async_session() as session:
        session.add(SomeObject(data="object"))
        session.add(SomeOtherObject(name="other object"))
        await session.commit()


async def main() -> None:
    # an AsyncEngine, which the AsyncSession will use for connection
    # resources
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/"
    )

    # create a reusable factory for new AsyncSession instances
    async_session = async_sessionmaker(engine)

    await run_some_sql(async_session)

    await engine.dispose()


async_sessionmaker非常有用,以便程序的不同部分可以使用预先建立的固定配置创建新的 AsyncSession 对象。请注意,AsyncSession 对象也可以在不使用时直接实例化 async_sessionmaker


2.0 新版功能: async_sessionmaker 提供了 sessionmaker 类,该类专用于 AsyncSession 对象,包括 pep-484 类型支持。


另请参阅


概要 - ORM - 显示示例使用


sessionmaker - 概述


SessionMaker 架构


Opening and Closing a Session - 有关使用 sessionmaker 创建会话的介绍性文本。


类签名


sqlalchemy.ext.asyncio.async_sessionmaker 键入。通用


方法 sqlalchemy.ext.asyncio.async_sessionmaker. __call__**local_kw: Any _AS


使用此async_sessionmaker中建立的配置生成新的 AsyncSession 对象。


在 Python 中,当 __call__ 方法以与函数相同的方式被 “调用” 时,会在对象上调用该方法:

AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False)
session = AsyncSession()  # invokes sessionmaker.__call__()

method sqlalchemy.ext.asyncio.async_sessionmaker. __init__bind: Optional[_AsyncSessionBind] = None*class_: type[_AS] = <class 'sqlalchemy.ext.asyncio.session.AsyncSession'>autoflush: bool = Trueexpire_on_commit: bool = Trueinfo: Optional[_InfoType] = None**kw: Any


构建新async_sessionmaker


这里的所有参数,除了 class_ 都对应于 Session 直接接受的参数。请参阅 AsyncSession.__init__() 文档字符串以获取有关参数的更多详细信息。


方法 sqlalchemy.ext.asyncio.async_sessionmaker. begin _AsyncSessionContextManager[_AS]


生成一个上下文管理器,它既提供了一个新的 AsyncSession 以及提交的事务。


例如:

async def main():
    Session = async_sessionmaker(some_engine)

    async with Session.begin() as session:
        session.add(some_object)

    # commits transaction, closes session

method sqlalchemy.ext.asyncio.async_sessionmaker. configure**new_kw Any None(无)¶


(重新)配置此async_sessionmaker的参数。


例如:

AsyncSession = async_sessionmaker(some_engine)

AsyncSession.configure(bind=create_async_engine("sqlite+aiosqlite://"))

sqlalchemy.ext.asyncio 中。async_scoped_session


提供 AsyncSession 对象的范围管理。


有关使用详情,请参阅使用 asyncio 范围的会话部分。


在 1.4.19 版本加入.


类签名


sqlalchemy.ext.asyncio.async_scoped_session 键入。通用


方法 sqlalchemy.ext.asyncio.async_scoped_session. __call__**kw Any _AS


返回当前的 AsyncSession,使用 scoped_session.session_factory if 不存在来创建它。


参数


kw – 关键字参数将传递给 scoped_session.session_factory callable,如果现有的 AsyncSession 不存在。 如果 存在 AsyncSession 和 keyword 参数, InvalidRequestError 引发。


方法 sqlalchemy.ext.asyncio.async_scoped_session. __init__session_factory async_sessionmaker[_AS]scopefunc Callable[[]Any]


构建新async_scoped_session


参数

  • session_factory– 用于创建新 AsyncSession 的工厂 实例。这通常(但不一定)是一个实例 async_sessionmaker


  • scopefunc—— 定义当前范围的函数。函数,如 asyncio.current_task 在这里可能有用。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async aclose None


AsyncSession.close() 的同义词。


代表 async_scoped_session 类代理 AsyncSession 类。


AsyncSession.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing Context Manager 函数。


2.0.20 版本的新Function。


method sqlalchemy.ext.asyncio.async_scoped_session. addinstance object_warn: bool = True


将对象放入此 Session 中。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


在传递给 Session.add() 方法将移动到 pending 状态,直到下一次 flush,此时它们将进入 persistent 状态。


在传递给 Session.add() 方法将移动到持久化 state 直接。


如果 Session 使用的事务回滚, 对象,这些对象在传递给 Session.add() 将移回 transient 状态,并且将不再存在于此 会话


方法 sqlalchemy.ext.asyncio.async_scoped_session. add_allinstances Iterable[object] None


将给定的实例集合添加到此 Session


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


有关一般行为描述,请参阅 Session.add() 的文档。


属性 sqlalchemy.ext.asyncio.async_scoped_session. 自动刷新


代表 AsyncSession 类的 Session.autoflush 属性的代理。


代表 async_scoped_session 类代理 AsyncSession 类。


方法 sqlalchemy.ext.asyncio.async_scoped_session. begin AsyncSessionTransaction


返回 AsyncSessionTransaction 对象。


代表 async_scoped_session 类代理 AsyncSession 类。


底层 Session 将在 AsyncSessionTransaction object 的输入值:

async with async_session.begin():
    ...  # ORM transaction is begun


请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应 SessionEvents.after_transaction_create() 事件钩子。


有关 ORM begin 的一般描述,请参阅 Session.begin() 的 Session.begin() 中。


方法 sqlalchemy.ext.asyncio.async_scoped_session. begin_nested AsyncSessionTransaction


返回一个 AsyncSessionTransaction 对象,该对象将开始一个 “嵌套” 事务,例如 SAVEPOINT。


代表 async_scoped_session 类代理 AsyncSession 类。


行为与 AsyncSession.begin() 相同。


有关 ORM begin 嵌套的一般描述,请参阅 Session.begin_nested() 中。


另请参阅


可序列化隔离/保存点/事务性 DDL(asyncio 版本)- SQLite asyncio 驱动程序需要特殊解决方法,以便 SAVEPOINT 正常工作。


属性 sqlalchemy.ext.asyncio.async_scoped_session. 绑定


代表 async_scoped_session 类的 AsyncSession.bind 属性的代理。


method sqlalchemy.ext.asyncio.async_scoped_session. async close None


关闭此使用的事务资源和 ORM 对象 AsyncSession 的 AsyncSession 中。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.close() - “close” 的主要文档


结束语 - 有关语义的详细信息 AsyncSession.close()AsyncSession.reset() 的 AsyncSession.reset() 中。


async classmethod sqlalchemy.ext.asyncio.async_scoped_session. close_all None(无)¶


关闭所有 AsyncSession 会话。


代表 async_scoped_session 类代理 AsyncSession 类。


2.0 版后已移除: AsyncSession.close_all() 方法已弃用,并将在未来发行版中删除。请参阅 close_all_sessions()。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async commit None


提交当前正在进行的事务。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.commit() - “提交” 的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. configure**kwargs Any None


重新配置此 scoped_session


请参阅 sessionmaker.configure()。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async connectionbind_arguments:_BindArgumentsNone=None, execution_options:CoreExecuteOptionsParameterNone=None, **kw Any AsyncConnection


返回与此 Session 对象的事务状态对应的 AsyncConnection 对象。


代表 async_scoped_session 类代理 AsyncSession 类。


此方法还可用于为当前事务使用的数据库连接建立执行选项。


1.4.24 版本中的新功能: 添加了传递给底层 Session.connection() 方法的 **kw 参数。


另请参阅


Session.connection() - “连接” 的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. async deleteinstance object


将实例标记为已删除。


代表 async_scoped_session 类代理 AsyncSession 类。


数据库删除作发生在 flush() 上


由于此作可能需要沿未加载的关系级联,因此可以等待允许这些查询发生。


另请参阅


Session.delete() - 删除的主要文档


已删除属性 sqlalchemy.ext.asyncio.async_scoped_session.


Session 中标记为 'deleted' 的所有实例的集合


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


attribute dirty(脏 sqlalchemy.ext.asyncio.async_scoped_session. 属性)¶


被视为 dirty 的所有持久性实例的集合。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


例如:

some_mapped_object in session.dirty


当实例被修改但未被删除时,它们被视为脏实例。


请注意,这种 “肮脏 ”的计算是 “乐观的”;大多数属性设置或集合修改作会将实例标记为 'dirty' 并将其放置在此集中,即使属性的值没有净变化。在 flush 时,将每个属性的值与其之前保存的值进行比较,如果没有净变化,则不会发生 SQL作(这是一个成本更高的作,因此仅在 flush 时完成)。


要检查实例是否对其属性进行了可作的净更改,请使用 Session.is_modified() 方法。


method sqlalchemy.ext.asyncio.async_scoped_session. async executestatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw any result[any]


执行一条语句并返回一个缓冲的 Result 对象。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.execute() - 执行的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. expireinstance object, attribute_names:Iterable[str]None=None) None(无)¶


使实例上的属性过期。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


将实例的属性标记为过期。当过期的 属性,则会向 Session 对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。


要同时使 Session 中的所有对象过期,请使用 Session.expire_all()。


Session 对象的默认行为是每当 Session.rollback()Session.commit() 方法,以便可以为新事务加载新状态。因此,调用 Session.expire() 仅对当前事务中发出非 ORM SQL 语句的特定情况有意义。


参数

  • instance– 要刷新的实例。


  • attribute_names – 可选的字符串属性名称列表,指示要过期的属性子集。


方法 sqlalchemy.ext.asyncio.async_scoped_session. expire_all


使此 Session 中的所有持久性实例过期。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


下次访问持久化实例上的任何属性时, 将使用 Session 对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。


要使单个对象和这些对象上的各个属性过期,请使用 Session.expire()。


Session 对象的默认行为是每当 Session.rollback()Session.commit() 方法,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用 Session.expire_all()


method sqlalchemy.ext.asyncio.async_scoped_session. expungeinstance object None(无)¶


从此 Session 中删除实例


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


这将释放对实例的所有内部引用。将根据 Expunge Cascade 规则应用 Cascading


方法 sqlalchemy.ext.asyncio.async_scoped_session. expunge_all


从此 Session 中删除所有对象实例。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


这相当于对 this 中的所有对象调用 expunge(obj) 会话


method sqlalchemy.ext.asyncio.async_scoped_session. async flushobjects:Sequence[Any]None=None None


将所有对象更改刷新到数据库。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.flush() - flush 的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. async getentity _EntityBindKey[_O]ident _PKIdentityArgument*, options:Sequence[ORMOption]None=None、populate_existingbool = Falsewith_for_updateForUpdateParameter = None、identity_token:AnyNone=None、execution_options OrmExecuteOptionsParameter = {}→_ONone


根据给定的主键标识符返回实例,如果未找到,则返回 None


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.get() - get 的主要文档


方法 sqlalchemy.ext.asyncio.async_scoped_session. get_bindmapper:_EntityBindKey[_O]None=None, clause:ClauseElementNone=None, bind:_SessionBindNone=None**kw Any)→EngineConnection


返回一个 “bind”,同步代理的 Session 已绑定。


代表 async_scoped_session 类代理 AsyncSession 类。


Session.get_bind() 方法不同,此 AsyncSession 当前以任何方式使用此方法来解析请求的引擎。


注意


此方法直接代理 Session.get_bind() 方法,但是与 Session.get_bind() 方法相比,它目前不能用作覆盖目标。 以下示例说明了如何实现自定义 Session.get_bind() 方案,适用于 AsyncSessionAsyncEngine 的 AsyncEngine 进行验证。


自定义垂直分区中引入的模式 说明了如何将自定义绑定查找方案应用于 Session 给定一组 Engine 对象。应用相应的 Session.get_bind() 实现以用于 AsyncSessionAsyncEngine 对象中,继续子类化 Session 并将其应用于 AsyncSession 使用 AsyncSession.sync_session_class 。内部方法必须继续返回 Engine 实例,这些实例可以使用 AsyncEngine.sync_engine 属性:

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    "leader": create_async_engine("sqlite+aiosqlite:///leader.db"),
    "other": create_async_engine("sqlite+aiosqlite:///other.db"),
    "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"),
    "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"),
}


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines["leader"].sync_engine
        else:
            return engines[
                random.choice(["follower1", "follower2"])
            ].sync_engine


# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)


Session.get_bind() 方法在非 asyncio、隐式非阻塞上下文中调用,其方式与通过 AsyncSession.run_sync() 调用的 ORM 事件钩子和函数相同,因此 希望在 Session.get_bind() 可以继续使用阻塞式代码来执行此作,该代码将在数据库驱动程序上调用 IO 时转换为隐式异步调用。


method sqlalchemy.ext.asyncio.async_scoped_session. async get_oneentity _EntityBindKey[_O]ident _PKIdentityArgument*, options:Sequence[ORMOption]None=无、populate_existing:bool = Falsewith_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options OrmExecuteOptionsParameter = {} _O


根据给定的主键标识符返回实例,如果未找到则引发异常。


代表 async_scoped_session 类代理 AsyncSession 类。


sqlalchemy.orm.exc.NoResultFound 如果查询未选择任何行,则引发。


..版本已添加: 2.0.22


另请参阅


Session.get_one() - get_one 的主要文档


类方法 sqlalchemy.ext.asyncio.async_scoped_session. identity_keyclass_:Type[Any]None=None, ident:AnyTuple[Any,...]=None, *, instance:AnyNone=None, row:Row[Any]RowMappingNone=None, identity_token:AnyNone=None) _IdentityKeyType[Any]


返回身份密钥。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


这是 identity_key() 的别名。


属性 sqlalchemy.ext.asyncio.async_scoped_session. identity_map


代表 AsyncSession 类的 Session.identity_map 属性的代理。


代表 async_scoped_session 类代理 AsyncSession 类。


属性 sqlalchemy.ext.asyncio.async_scoped_session. 信息


用户可修改的字典。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


此字典的初始值可以使用 info 参数添加到 Session 构造函数中,或者 sessionmaker 构造函数或工厂方法。这里的字典始终是这个 Session 的本地字典,并且可以独立于所有其他 Session 对象进行修改。


method sqlalchemy.ext.asyncio.async_scoped_session. async invalidate None


使用连接失效关闭此 Session。


代表 async_scoped_session 类代理 AsyncSession 类。


有关完整描述,请参阅 Session.invalidate()。


属性 sqlalchemy.ext.asyncio.async_scoped_session. is_active


如果此 Session 未处于 “partial rollback” 状态,则为 True。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


在 1.4 版本发生变更: Session 不再立即开始新的事务,因此当 Session 首次实例化时,此属性将为 False。


“partial rollback” 状态通常表示 Session 的 flush 过程失败,并且 Session.rollback() 方法才能完全回滚事务。


如果此 Session 根本不在事务中,则 Session 将在首次使用时自动启动,因此在这种情况下,Session.is_active将返回 True。


否则,如果此 Session 在事务中, 并且该事务尚未在内部回滚,则 Session.is_active 也将返回 True。


方法 sqlalchemy.ext.asyncio.async_scoped_session. is_modifiedinstance objectinclude_collections: bool = True bool


如果给定实例具有本地修改的属性,则返回 True


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


此方法检索实例上每个分析属性的历史记录,并执行当前值与其先前刷新或提交的值(如果有)的比较。


它实际上是更昂贵和准确的 版本。 Session.dirty 集合;对每个属性的 Net “dirty” 状态执行完整测试。


例如:

return session.is_modified(someobject)


此方法需要注意以下几点:


  • 使用此方法测试时,Session.dirty 集合中存在的实例可能会报告 False。这是因为该对象可能已经通过属性 mutation 接收了更改事件,因此将其放置在 Session.dirty 中,但最终状态与从数据库加载的状态相同,因此这里没有净变化。


  • 如果在接收新值时未加载或过期,则在应用新值时,标量属性可能未记录先前设置的值 - 在这些情况下,假定该属性发生了变化,即使最终没有针对其数据库值的净变化。在大多数情况下,SQLAlchemy 在发生 set 事件时不需要“旧”值,因此如果旧值不存在,它会跳过 SQL 调用的费用,基于通常需要标量值的 UPDATE 的假设,并且在不需要的少数情况下,平均比发出防御性 SELECT 便宜。


    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件地获取 “old” 值。此标志通常用于主键属性和标量对象引用,它们不是简单的多对一。要为任何任意映射列设置此标志,请使用 active_history 参数替换为 column_property()


参数

  • instance—— 要测试待处理更改的映射实例。


  • include_collections– 指示作中是否应包含多值集合。将此设置为 False 是一种仅检测基于局部列的属性(即标量列或多对一外键)的方法,这些属性在刷新时会导致此实例发生 UPDATE。


method sqlalchemy.ext.asyncio.async_scoped_session. async mergeinstance _O*load bool = True, options:Sequence[ORMOption]None=None) _O


将给定实例的状态复制到此 AsyncSession 中的相应实例中。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.merge() - 合并的主要文档


属性 sqlalchemy.ext.asyncio.async_scoped_session. new


Session 中标记为 'new' 的所有实例的集合。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


属性 sqlalchemy.ext.asyncio.async_scoped_session. no_autoflush


返回禁用 autoflush 的上下文管理器。


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()


with: 块中继续的作不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时非常有用,其中未完成的对象不应被刷新。


类方法 sqlalchemy.ext.asyncio.async_scoped_session. object_sessioninstance object→SessionNone


返回对象所属的 Session


代表 async_scoped_session 类代理 AsyncSession 类。


代表 AsyncSession 类代理 Session 类。


这是 object_session() 的别名。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async refreshinstance object, attribute_names:Iterable[str]None=None, with_for_update: ForUpdateParameter = None


过期并刷新给定实例上的属性。


代表 async_scoped_session 类代理 AsyncSession 类。


将向数据库发出查询,并且所有属性都将使用其当前数据库值刷新。


这是 Session.refresh() 方法的异步版本。有关所有选项的完整说明,请参阅该方法。


另请参阅


Session.refresh() - 刷新的主要文档


方法 sqlalchemy.ext.asyncio.async_scoped_session. async remove None


释放当前 AsyncSession(如果存在)。


与 scoped_session 的 remove 方法不同,该方法会使用 await 来等待 AsyncSession 的 close 方法。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async reset)→ None


关闭此使用的事务资源和 ORM 对象 Session,将会话重置为其初始状态。


代表 async_scoped_session 类代理 AsyncSession 类。


2.0.22 新版功能.


另请参阅


Session.reset() - “reset” 的主要文档


结束语 - 有关语义的详细信息 AsyncSession.close()AsyncSession.reset() 的 AsyncSession.reset() 中。


方法 sqlalchemy.ext.asyncio.async_scoped_session. async rollback None


回滚当前正在进行的事务。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.rollback() - “回滚” 的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. async scalarstatement 可执行文件, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 任意


执行语句并返回标量结果。


代表 async_scoped_session 类代理 AsyncSession 类。


另请参阅


Session.scalar() - 标量的主要文档


method sqlalchemy.ext.asyncio.async_scoped_session. async scalarsstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 ScalarResult[任意]


执行语句并返回标量结果。


代表 async_scoped_session 类代理 AsyncSession 类。


结果


一个 ScalarResult 对象


1.4.24 版本中的新功能: 添加了 AsyncSession.scalars()


1.4.26 版本中的新功能: 添加 async_scoped_session.scalars()


另请参阅


Session.scalars() - 标量的主要文档


AsyncSession.stream_scalars() - 流版本


属性 sqlalchemy.ext.asyncio.async_scoped_session. session_factory:async_sessionmaker[_AS]


提供给 __init__ 的session_factory存储在此属性中,以后可以访问。当需要新的非范围的 AsyncSession 时,这可能很有用。


method sqlalchemy.ext.asyncio.async_scoped_session. async streamstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 AsyncResult[任意]


执行语句并返回流 AsyncResult 对象。


代表 async_scoped_session 类代理 AsyncSession 类。


method sqlalchemy.ext.asyncio.async_scoped_session. async stream_scalarsstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 AsyncScalarResult[任意]


执行一个语句并返回一个标量结果流。


代表 async_scoped_session 类代理 AsyncSession 类。


结果


一个 AsyncScalarResult 对象


在 1.4.24 版本加入.


另请参阅


Session.scalars() - 标量的主要文档


AsyncSession.scalars() - 非流版本


sqlalchemy.ext.asyncio 中。异步属性


Mixin 类,该类为所有属性提供可等待的访问器。


例如:

from __future__ import annotations

from typing import List

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(AsyncAttrs, DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str]
    bs: Mapped[List[B]] = relationship()


class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[str]


在上面的示例中,AsyncAttrs mixin 应用于声明式 Base 类,它对所有子类生效。 此 mixin 添加了一个新属性 AsyncAttrs.awaitable_attrs 所有类,这将产生任何属性的值作为 awaitable。这允许访问可能受延迟加载或延迟/未过期加载约束的属性,以便仍然可以发出 IO:

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

# use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs``
# interface, so that it may be awaited
for b1 in await a1.awaitable_attrs.bs:
    print(b1)


AsyncAttrs.awaitable_attrs 对 属性,这大致相当于使用 AsyncSession.run_sync() 方法,例如:

for b1 in await async_session.run_sync(lambda sess: a1.bs):
    print(b1)


2.0.13 新版功能.


属性 sqlalchemy.ext.asyncio.AsyncAttrs. awaitable_attrs


提供此对象上包装为 Awaittables 的所有属性的命名空间。


例如:

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

some_attribute = await a1.awaitable_attrs.some_deferred_attribute
some_collection = await a1.awaitable_attrs.some_collection

sqlalchemy.ext.asyncio 中。异步会话


Asyncio 版本的 Session 的 Session。


AsyncSession 是传统 Session 实例。


AsyncSession并发中使用是不安全的 任务..请参阅 Session 线程安全吗? 在并发任务中共享 AsyncSession 是否安全?作为背景。


在 1.4 版本加入.


AsyncSession 与自定义 Session 一起使用 implementations,请参阅 AsyncSession.sync_session_class 参数。


类签名


sqlalchemy.ext.asyncio.AsyncSession sqlalchemy.ext.asyncio.base.ReversibleProxy


属性 sqlalchemy.ext.asyncio.AsyncSession. sync_session_class: Type[Session] = <class 'sqlalchemy.orm.session.Session'>


为特定 Session 实例提供底层 Session 实例的类或可调用对象 AsyncSession 的 AsyncSession 中。


在类级别,此属性是 AsyncSession.sync_session_class 参数。AsyncSession 的自定义子类可以覆盖此 URL。


在实例级别,此属性指示用于为此 AsyncSession 实例提供 Session 实例的当前类或可调用对象。


在 1.4.24 版本加入.


方法 sqlalchemy.ext.asyncio.AsyncSession. __init__bind:_AsyncSessionBindNone=None, *, binds:Dict[_SessionBindKey,_AsyncSessionBind]None=None, sync_session_class:Type[Session]None=None, **kw Any


构造一个新的 AsyncSession


sync_session_class 之外的所有参数都将传递给 sync_session_class直接调用来实例化新的 会话。有关参数文档,请参阅 Session.__init__()。


参数

sync_session_class


一个 Session 子类或其他可调用对象,将用于构造将被代理的 Session。此参数可用于提供自定义 Session 子。默认为 AsyncSession.sync_session_class class-level 属性。


在 1.4.24 版本加入.


方法 sqlalchemy.ext.asyncio.AsyncSession. async aclose None


AsyncSession.close() 的同义词。


AsyncSession.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing Context Manager 函数。


2.0.20 版本的新Function。


method sqlalchemy.ext.asyncio.AsyncSession. addinstance object_warn: bool = True


将对象放入此 Session 中。


代表 AsyncSession 类代理 Session 类。


在传递给 Session.add() 方法将移动到 pending 状态,直到下一次 flush,此时它们将进入 persistent 状态。


在传递给 Session.add() 方法将移动到持久化 state 直接。


如果 Session 使用的事务回滚, 对象,这些对象在传递给 Session.add() 将移回 transient 状态,并且将不再存在于此 会话


方法 sqlalchemy.ext.asyncio.AsyncSession. add_allinstances Iterable[object] None


将给定的实例集合添加到此 Session


代表 AsyncSession 类代理 Session 类。


有关一般行为描述,请参阅 Session.add() 的文档。


属性 sqlalchemy.ext.asyncio.AsyncSession. 自动刷新


代表 AsyncSession 类的 Session.autoflush 属性的代理。


方法 sqlalchemy.ext.asyncio.AsyncSession. begin AsyncSessionTransaction


返回 AsyncSessionTransaction 对象。


底层 Session 将在 AsyncSessionTransaction object 的输入值:

async with async_session.begin():
    ...  # ORM transaction is begun


请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应 SessionEvents.after_transaction_create() 事件钩子。


有关 ORM begin 的一般描述,请参阅 Session.begin() 的 Session.begin() 中。


方法 sqlalchemy.ext.asyncio.AsyncSession. begin_nested AsyncSessionTransaction


返回一个 AsyncSessionTransaction 对象,该对象将开始一个 “嵌套” 事务,例如 SAVEPOINT。


行为与 AsyncSession.begin() 相同。


有关 ORM begin 嵌套的一般描述,请参阅 Session.begin_nested() 中。


另请参阅


可序列化隔离/保存点/事务性 DDL(asyncio 版本)- SQLite asyncio 驱动程序需要特殊解决方法,以便 SAVEPOINT 正常工作。


method sqlalchemy.ext.asyncio.AsyncSession. async close None


关闭此使用的事务资源和 ORM 对象 AsyncSession 的 AsyncSession 中。


另请参阅


Session.close() - “close” 的主要文档


结束语 - 有关语义的详细信息 AsyncSession.close()AsyncSession.reset() 的 AsyncSession.reset() 中。


async classmethod sqlalchemy.ext.asyncio.AsyncSession. close_all None(无)¶


关闭所有 AsyncSession 会话。


2.0 版后已移除: AsyncSession.close_all() 方法已弃用,并将在未来发行版中删除。请参阅 close_all_sessions()。


方法 sqlalchemy.ext.asyncio.AsyncSession. async commit None


提交当前正在进行的事务。


另请参阅


Session.commit() - “提交” 的主要文档


方法 sqlalchemy.ext.asyncio.AsyncSession. async connectionbind_arguments:_BindArgumentsNone=None, execution_options:CoreExecuteOptionsParameterNone=None, **kw Any AsyncConnection


返回与此 Session 对象的事务状态对应的 AsyncConnection 对象。


此方法还可用于为当前事务使用的数据库连接建立执行选项。


1.4.24 版本中的新功能: 添加了传递给底层 Session.connection() 方法的 **kw 参数。


另请参阅


Session.connection() - “连接” 的主要文档


method sqlalchemy.ext.asyncio.AsyncSession. async deleteinstance object


将实例标记为已删除。


数据库删除作发生在 flush() 上


由于此作可能需要沿未加载的关系级联,因此可以等待允许这些查询发生。


另请参阅


Session.delete() - 删除的主要文档


已删除属性 sqlalchemy.ext.asyncio.AsyncSession.


Session 中标记为 'deleted' 的所有实例的集合


代表 AsyncSession 类代理 Session 类。


attribute dirty(脏 sqlalchemy.ext.asyncio.AsyncSession. 属性)¶


被视为 dirty 的所有持久性实例的集合。


代表 AsyncSession 类代理 Session 类。


例如:

some_mapped_object in session.dirty


当实例被修改但未被删除时,它们被视为脏实例。


请注意,这种 “肮脏 ”的计算是 “乐观的”;大多数属性设置或集合修改作会将实例标记为 'dirty' 并将其放置在此集中,即使属性的值没有净变化。在 flush 时,将每个属性的值与其之前保存的值进行比较,如果没有净变化,则不会发生 SQL作(这是一个成本更高的作,因此仅在 flush 时完成)。


要检查实例是否对其属性进行了可作的净更改,请使用 Session.is_modified() 方法。


method sqlalchemy.ext.asyncio.AsyncSession. async executestatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw any result[any]


执行一条语句并返回一个缓冲的 Result 对象。


另请参阅


Session.execute() - 执行的主要文档


method sqlalchemy.ext.asyncio.AsyncSession. expireinstance object, attribute_names:Iterable[str]None=None) None(无)¶


使实例上的属性过期。


代表 AsyncSession 类代理 Session 类。


将实例的属性标记为过期。当过期的 属性,则会向 Session 对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。


要同时使 Session 中的所有对象过期,请使用 Session.expire_all()。


Session 对象的默认行为是每当 Session.rollback()Session.commit() 方法,以便可以为新事务加载新状态。因此,调用 Session.expire() 仅对当前事务中发出非 ORM SQL 语句的特定情况有意义。


参数

  • instance– 要刷新的实例。


  • attribute_names – 可选的字符串属性名称列表,指示要过期的属性子集。


方法 sqlalchemy.ext.asyncio.AsyncSession. expire_all


使此 Session 中的所有持久性实例过期。


代表 AsyncSession 类代理 Session 类。


下次访问持久化实例上的任何属性时, 将使用 Session 对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。


要使单个对象和这些对象上的各个属性过期,请使用 Session.expire()。


Session 对象的默认行为是每当 Session.rollback()Session.commit() 方法,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用 Session.expire_all()


method sqlalchemy.ext.asyncio.AsyncSession. expungeinstance object None(无)¶


从此 Session 中删除实例


代表 AsyncSession 类代理 Session 类。


这将释放对实例的所有内部引用。将根据 Expunge Cascade 规则应用 Cascading


方法 sqlalchemy.ext.asyncio.AsyncSession. expunge_all


从此 Session 中删除所有对象实例。


代表 AsyncSession 类代理 Session 类。


这相当于对 this 中的所有对象调用 expunge(obj) 会话


method sqlalchemy.ext.asyncio.AsyncSession. async flushobjects:Sequence[Any]None=None None


将所有对象更改刷新到数据库。


另请参阅


Session.flush() - flush 的主要文档


method sqlalchemy.ext.asyncio.AsyncSession. async getentity _EntityBindKey[_O]ident _PKIdentityArgument*, options:Sequence[ORMOption]None=None、populate_existingbool = Falsewith_for_updateForUpdateParameter = None、identity_token:AnyNone=None、execution_options OrmExecuteOptionsParameter = {}→_ONone


根据给定的主键标识符返回实例,如果未找到,则返回 None


另请参阅


Session.get() - get 的主要文档


方法 sqlalchemy.ext.asyncio.AsyncSession. get_bindmapper:_EntityBindKey[_O]None=None, clause:ClauseElementNone=None, bind:_SessionBindNone=None**kw Any)→EngineConnection


返回一个 “bind”,同步代理的 Session 已绑定。


Session.get_bind() 方法不同,此 AsyncSession 当前以任何方式使用此方法来解析请求的引擎。


注意


此方法直接代理 Session.get_bind() 方法,但是与 Session.get_bind() 方法相比,它目前不能用作覆盖目标。 以下示例说明了如何实现自定义 Session.get_bind() 方案,适用于 AsyncSessionAsyncEngine 的 AsyncEngine 进行验证。


自定义垂直分区中引入的模式 说明了如何将自定义绑定查找方案应用于 Session 给定一组 Engine 对象。应用相应的 Session.get_bind() 实现以用于 AsyncSessionAsyncEngine 对象中,继续子类化 Session 并将其应用于 AsyncSession 使用 AsyncSession.sync_session_class 。内部方法必须继续返回 Engine 实例,这些实例可以使用 AsyncEngine.sync_engine 属性:

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    "leader": create_async_engine("sqlite+aiosqlite:///leader.db"),
    "other": create_async_engine("sqlite+aiosqlite:///other.db"),
    "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"),
    "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"),
}


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines["leader"].sync_engine
        else:
            return engines[
                random.choice(["follower1", "follower2"])
            ].sync_engine


# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)


Session.get_bind() 方法在非 asyncio、隐式非阻塞上下文中调用,其方式与通过 AsyncSession.run_sync() 调用的 ORM 事件钩子和函数相同,因此 希望在 Session.get_bind() 可以继续使用阻塞式代码来执行此作,该代码将在数据库驱动程序上调用 IO 时转换为隐式异步调用。


方法 sqlalchemy.ext.asyncio.AsyncSession. get_nested_transaction→AsyncSessionTransactionNone


返回当前正在进行的嵌套事务(如果有)。


结果


一个 AsyncSessionTransaction 对象,或者


在 1.4.18 版本加入.


method sqlalchemy.ext.asyncio.AsyncSession. async get_oneentity _EntityBindKey[_O]ident _PKIdentityArgument*, options:Sequence[ORMOption]None=无、populate_existing:bool = Falsewith_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options OrmExecuteOptionsParameter = {} _O


根据给定的主键标识符返回实例,如果未找到则引发异常。


sqlalchemy.orm.exc.NoResultFound 如果查询未选择任何行,则引发。


..版本已添加: 2.0.22


另请参阅


Session.get_one() - get_one 的主要文档


方法 sqlalchemy.ext.asyncio.AsyncSession. get_transaction→AsyncSessionTransactionNone


返回当前正在进行的根事务(如果有)。


结果


一个 AsyncSessionTransaction 对象,或者


在 1.4.18 版本加入.


类方法 sqlalchemy.ext.asyncio.AsyncSession. identity_keyclass_:Type[Any]None=None, ident:AnyTuple[Any,...]=None, *, instance:AnyNone=None, row:Row[Any]RowMappingNone=None, identity_token:AnyNone=None) _IdentityKeyType[Any]


返回身份密钥。


代表 AsyncSession 类代理 Session 类。


这是 identity_key() 的别名。


属性 sqlalchemy.ext.asyncio.AsyncSession. identity_map


代表 AsyncSession 类的 Session.identity_map 属性的代理。


方法 sqlalchemy.ext.asyncio.AsyncSession. in_nested_transaction bool


如果此 Session 已经开始一个嵌套事务,例如 SAVEPOINT,则返回 True。


代表 AsyncSession 类代理 Session 类。


在 1.4 版本加入.


方法 sqlalchemy.ext.asyncio.AsyncSession. in_transaction bool


如果此 Session 已开始事务,则返回 True。


代表 AsyncSession 类代理 Session 类。


在 1.4 版本加入.


另请参阅

Session.is_active


属性 sqlalchemy.ext.asyncio.AsyncSession. 信息


用户可修改的字典。


代表 AsyncSession 类代理 Session 类。


此字典的初始值可以使用 info 参数添加到 Session 构造函数中,或者 sessionmaker 构造函数或工厂方法。这里的字典始终是这个 Session 的本地字典,并且可以独立于所有其他 Session 对象进行修改。


method sqlalchemy.ext.asyncio.AsyncSession. async invalidate None


使用连接失效关闭此 Session。


有关完整描述,请参阅 Session.invalidate()。


属性 sqlalchemy.ext.asyncio.AsyncSession. is_active


如果此 Session 未处于 “partial rollback” 状态,则为 True。


代表 AsyncSession 类代理 Session 类。


在 1.4 版本发生变更: Session 不再立即开始新的事务,因此当 Session 首次实例化时,此属性将为 False。


“partial rollback” 状态通常表示 Session 的 flush 过程失败,并且 Session.rollback() 方法才能完全回滚事务。


如果此 Session 根本不在事务中,则 Session 将在首次使用时自动启动,因此在这种情况下,Session.is_active将返回 True。


否则,如果此 Session 在事务中, 并且该事务尚未在内部回滚,则 Session.is_active 也将返回 True。


方法 sqlalchemy.ext.asyncio.AsyncSession. is_modifiedinstance objectinclude_collections: bool = True bool


如果给定实例具有本地修改的属性,则返回 True


代表 AsyncSession 类代理 Session 类。


此方法检索实例上每个分析属性的历史记录,并执行当前值与其先前刷新或提交的值(如果有)的比较。


它实际上是更昂贵和准确的 版本。 Session.dirty 集合;对每个属性的 Net “dirty” 状态执行完整测试。


例如:

return session.is_modified(someobject)


此方法需要注意以下几点:


  • 使用此方法测试时,Session.dirty 集合中存在的实例可能会报告 False。这是因为该对象可能已经通过属性 mutation 接收了更改事件,因此将其放置在 Session.dirty 中,但最终状态与从数据库加载的状态相同,因此这里没有净变化。


  • 如果在接收新值时未加载或过期,则在应用新值时,标量属性可能未记录先前设置的值 - 在这些情况下,假定该属性发生了变化,即使最终没有针对其数据库值的净变化。在大多数情况下,SQLAlchemy 在发生 set 事件时不需要“旧”值,因此如果旧值不存在,它会跳过 SQL 调用的费用,基于通常需要标量值的 UPDATE 的假设,并且在不需要的少数情况下,平均比发出防御性 SELECT 便宜。


    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件地获取 “old” 值。此标志通常用于主键属性和标量对象引用,它们不是简单的多对一。要为任何任意映射列设置此标志,请使用 active_history 参数替换为 column_property()


参数

  • instance—— 要测试待处理更改的映射实例。


  • include_collections– 指示作中是否应包含多值集合。将此设置为 False 是一种仅检测基于局部列的属性(即标量列或多对一外键)的方法,这些属性在刷新时会导致此实例发生 UPDATE。


method sqlalchemy.ext.asyncio.AsyncSession. async mergeinstance _O*load bool = True, options:Sequence[ORMOption]None=None) _O


将给定实例的状态复制到此 AsyncSession 中的相应实例中。


另请参阅


Session.merge() - 合并的主要文档


属性 sqlalchemy.ext.asyncio.AsyncSession. new


Session 中标记为 'new' 的所有实例的集合。


代表 AsyncSession 类代理 Session 类。


属性 sqlalchemy.ext.asyncio.AsyncSession. no_autoflush


返回禁用 autoflush 的上下文管理器。


代表 AsyncSession 类代理 Session 类。


例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()


with: 块中继续的作不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时非常有用,其中未完成的对象不应被刷新。


类方法 sqlalchemy.ext.asyncio.AsyncSession. object_sessioninstance object→SessionNone


返回对象所属的 Session


代表 AsyncSession 类代理 Session 类。


这是 object_session() 的别名。


方法 sqlalchemy.ext.asyncio.AsyncSession. async refreshinstance object, attribute_names:Iterable[str]None=None, with_for_update: ForUpdateParameter = None


过期并刷新给定实例上的属性。


将向数据库发出查询,并且所有属性都将使用其当前数据库值刷新。


这是 Session.refresh() 方法的异步版本。有关所有选项的完整说明,请参阅该方法。


另请参阅


Session.refresh() - 刷新的主要文档


方法 sqlalchemy.ext.asyncio.AsyncSession. async reset)→ None


关闭此使用的事务资源和 ORM 对象 Session,将会话重置为其初始状态。


2.0.22 新版功能.


另请参阅


Session.reset() - “reset” 的主要文档


结束语 - 有关语义的详细信息 AsyncSession.close()AsyncSession.reset() 的 AsyncSession.reset() 中。


方法 sqlalchemy.ext.asyncio.AsyncSession. async rollback None


回滚当前正在进行的事务。


另请参阅


Session.rollback() - “回滚” 的主要文档


方法 sqlalchemy.ext.asyncio.AsyncSession. async run_syncfn: ~typing.Callable[[~typing.连接[~sqlalchemy.orm.session.Session, ~_P]], ~sqlalchemy.ext.asyncio.session._T], *arg: ~typing.~_P, **kw: ~typing.~_P _T


调用给定的 synchronous (i. not async) 可调用对象,传递一个 synchronous 风格的 Session 作为第一个参数。


此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。


例如:

def some_business_method(session: Session, param: str) -> str:
    """A synchronous function that does not require awaiting

    :param session: a SQLAlchemy Session, used synchronously

    :return: an optional return value is supported

    """
    session.add(MyObject(param=param))
    session.flush()
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    """an async function that uses awaiting"""

    with AsyncSession(async_engine) as async_session:
        # run some_business_method() with a sync-style
        # Session, proxied into an awaitable
        return_code = await async_session.run_sync(
            some_business_method, param="param1"
        )
        print(return_code)


此方法通过在专门检测的 greenlet 中运行给定的可调用对象来维护 asyncio 事件循环,直到数据库连接。


提示


提供的可调用对象在 asyncio 事件循环中内联调用,并将在传统的 IO 调用上阻塞。此可调用对象中的 IO 应仅调用 SQLAlchemy 的 asyncio 数据库 API,该 API 将适当适应 greenlet 上下文。


另请参阅


AsyncAttrs - ORM 映射类的 mixin,它在每个属性的基础上更简洁地提供了类似的功能


AsyncConnection.run_sync()


在 asyncio 下运行同步方法和函数


method sqlalchemy.ext.asyncio.AsyncSession. async scalarstatement 可执行文件, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 任意


执行语句并返回标量结果。


另请参阅


Session.scalar() - 标量的主要文档


method sqlalchemy.ext.asyncio.AsyncSession. async scalarsstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 ScalarResult[任意]


执行语句并返回标量结果。


结果


一个 ScalarResult 对象


1.4.24 版本中的新功能: 添加了 AsyncSession.scalars()


1.4.26 版本中的新功能: 添加 async_scoped_session.scalars()


另请参阅


Session.scalars() - 标量的主要文档


AsyncSession.stream_scalars() - 流版本


method sqlalchemy.ext.asyncio.AsyncSession. async streamstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 AsyncResult[任意]


执行语句并返回流 AsyncResult 对象。


method sqlalchemy.ext.asyncio.AsyncSession. async stream_scalarsstatement Executable, params:_CoreAnyExecuteParamsNone=None, *execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw 任意 AsyncScalarResult[任意]


执行一个语句并返回一个标量结果流。


结果


一个 AsyncScalarResult 对象


在 1.4.24 版本加入.


另请参阅


Session.scalars() - 标量的主要文档


AsyncSession.scalars() - 非流版本


属性 sqlalchemy.ext.asyncio.AsyncSession. sync_session: 会话


引用底层 Session this AsyncSession 代理请求。


此实例可用作事件目标。


sqlalchemy.ext.asyncio 中。AsyncSessionTransaction


ORM SessionTransaction 对象的包装器。


提供此对象是为了可以返回 AsyncSession.begin() 的事务持有对象。


该对象支持对 AsyncSessionTransaction.commit() AsyncSessionTransaction.rollback() ,以及用作异步上下文管理器。


在 1.4 版本加入.


类签名


sqlalchemy.ext.asyncio.AsyncSessionTransaction sqlalchemy.ext.asyncio.base.ReversibleProxysqlalchemy.ext.asyncio.base.StartableContext


方法 sqlalchemy.ext.asyncio.AsyncSessionTransaction. async commit None


提交此 AsyncTransaction


方法 sqlalchemy.ext.asyncio.AsyncSessionTransaction. async rollback None


回滚此 AsyncTransaction