异步 I/O (asyncio)¶
支持 Python asyncio。包括对 Core 和 ORM 使用的支持,使用与 asyncio 兼容的方言。
在 1.4 版本加入.
警告
请阅读 Asyncio 平台安装说明(包括 Apple M1),了解许多平台(包括 Apple M1 架构)的重要平台安装说明。
Asyncio 平台安装说明(包括 Apple M1)¶
asyncio 扩展只需要 Python 3。它还取决于 greenlet 库。默认情况下,此依赖项安装在常见的计算机平台上,包括:
x86_64 aarch64 ppc64le amd64 win32
对于上述平台,众所周知,greenlet
会提供预构建的 wheel 文件。对于其他平台,默认情况下不安装 greenlet;
Greenlet 的当前文件列表可以在
Greenlet - 下载文件。请注意,省略了许多架构,包括 Apple M1。
在确保存在 greenlet
依赖项的同时安装 SQLAlchemy
无论使用哪个平台,
[asyncio]
setuptools 额外
可以安装
如下所示,其中还将包括指示 pip
安装 Greenlet
:
pip install sqlalchemy[asyncio]
请注意,在没有预建 wheel 文件的平台上安装 greenlet
意味着 greenlet
将从源代码构建,这需要 Python 的开发库也存在。
概要 - 核心¶
对于 Core 使用,create_async_engine()
函数会创建一个 AsyncEngine
实例,然后提供传统 Engine
API 的异步版本。 这
AsyncEngine
通过其 AsyncEngine.connect()
和 AsyncEngine.begin()
提供 AsyncConnection
方法,它们都提供异步上下文管理器。 这
然后,AsyncConnection
可以使用
AsyncConnection.execute()
方法传递一个缓冲的
Result
或 AsyncConnection.stream()
方法提供流式处理服务器端 AsyncResult
:
>>> import asyncio
>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))
>>> async def async_main() -> None:
... engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
... async with engine.begin() as conn:
... await conn.run_sync(meta.drop_all)
... await conn.run_sync(meta.create_all)
...
... await conn.execute(
... t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
... )
...
... async with engine.connect() as conn:
... # select a Result, which will be delivered with buffered
... # results
... result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
... print(result.fetchall())
...
... # for AsyncEngine created in function scope, close and
... # clean-up pooled connections
... await engine.dispose()
>>> asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE t1 (
name VARCHAR(50) NOT NULL,
PRIMARY KEY (name)
)
...
INSERT INTO t1 (name) VALUES (?)
[...] [('some name 1',), ('some name 2',)]
COMMIT
BEGIN (implicit)
SELECT t1.name
FROM t1
WHERE t1.name = ?
[...] ('some name 1',)
[('some name 1',)]
ROLLBACK
上面,AsyncConnection.run_sync()
方法可用于调用不包含可等待钩子的特殊 DDL 函数,例如 MetaData.create_all()。
提示
在
范围,该范围将脱离上下文并被垃圾回收,如
async_main
函数。这可确保连接池保持打开的任何连接都将在可等待的上下文中正确处理。与使用阻塞 IO 时不同,SQLAlchemy 无法在 __del__
或 weakref 终结器,因为没有机会调用 await
。
当引擎超出范围时未能显式释放引擎
可能会导致向 Standard Out 发出类似于
RuntimeError: Event loop is closed
在垃圾回收中。
AsyncConnection
还通过 AsyncConnection.stream()
方法提供了一个“流式”API,该方法返回一个
AsyncResult
对象。此结果对象使用服务器端游标并提供 async/await API,例如异步迭代器:
async with engine.connect() as conn:
async_result = await conn.stream(select(t1))
async for row in async_result:
print("row: %s" % (row,))
概要 - ORM¶
使用 2.0 样式的查询,AsyncSession
类提供了完整的 ORM 功能。
在默认使用模式下,必须特别注意避免懒惰
加载或其他涉及 ORM 关系和列属性的过期属性访问;下一节 使用 AsyncSession 时防止隐式 IO 对此进行了详细介绍。
警告
AsyncSession
的单个实例对于
在多个并发任务中使用。 查看各部分
将 AsyncSession 与并发任务一起使用,会话是否线程安全? 在并发任务中共享 AsyncSession 是否安全?作为背景。
下面的示例说明了一个完整的示例,包括 mapper 和 session 配置:
>>> from __future__ import annotations
>>> import asyncio
>>> import datetime
>>> from typing import List
>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload
>>> class Base(AsyncAttrs, DeclarativeBase):
... pass
>>> class B(Base):
... __tablename__ = "b"
...
... id: Mapped[int] = mapped_column(primary_key=True)
... a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
... data: Mapped[str]
>>> class A(Base):
... __tablename__ = "a"
...
... id: Mapped[int] = mapped_column(primary_key=True)
... data: Mapped[str]
... create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
... bs: Mapped[List[B]] = relationship()
>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
... async with async_session() as session:
... async with session.begin():
... session.add_all(
... [
... A(bs=[B(data="b1"), B(data="b2")], data="a1"),
... A(bs=[], data="a2"),
... A(bs=[B(data="b3"), B(data="b4")], data="a3"),
... ]
... )
>>> async def select_and_update_objects(
... async_session: async_sessionmaker[AsyncSession],
... ) -> None:
... async with async_session() as session:
... stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
... result = await session.execute(stmt)
...
... for a in result.scalars():
... print(a, a.data)
... print(f"created at: {a.create_date}")
... for b in a.bs:
... print(b, b.data)
...
... result = await session.execute(select(A).order_by(A.id).limit(1))
...
... a1 = result.scalars().one()
...
... a1.data = "new data"
...
... await session.commit()
...
... # access attribute subsequent to commit; this is what
... # expire_on_commit=False allows
... print(a1.data)
...
... # alternatively, AsyncAttrs may be used to access any attribute
... # as an awaitable (new in 2.0.13)
... for b1 in await a1.awaitable_attrs.bs:
... print(b1, b1.data)
>>> async def async_main() -> None:
... engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
... # async_sessionmaker: a factory for new AsyncSession objects.
... # expire_on_commit - don't expire objects after transaction commit
... async_session = async_sessionmaker(engine, expire_on_commit=False)
...
... async with engine.begin() as conn:
... await conn.run_sync(Base.metadata.create_all)
...
... await insert_objects(async_session)
... await select_and_update_objects(async_session)
...
... # for AsyncEngine created in function scope, close and
... # clean-up pooled connections
... await engine.dispose()
>>> asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE a (
id INTEGER NOT NULL,
data VARCHAR NOT NULL,
create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
PRIMARY KEY (id)
)
...
CREATE TABLE b (
id INTEGER NOT NULL,
a_id INTEGER NOT NULL,
data VARCHAR NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(a_id) REFERENCES a (id)
)
...
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) RETURNING id, create_date
[...] ('a1',)
...
INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id
[...] (1, 'b2')
...
COMMIT
BEGIN (implicit)
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
[...] ()
SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data
FROM b
WHERE b.a_id IN (?, ?, ?)
[...] (1, 2, 3)
<A object at ...> a1
created at: ...
<B object at ...> b1
<B object at ...> b2
<A object at ...> a2
created at: ...
<A object at ...> a3
created at: ...
<B object at ...> b3
<B object at ...> b4
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
LIMIT ? OFFSET ?
[...] (1, 0)
UPDATE a SET data=? WHERE a.id = ?
[...] ('new data', 1)
COMMIT
new data
<B object at ...> b1
<B object at ...> b2
在上面的示例中,AsyncSession
使用可选的 async_sessionmaker
帮助程序进行实例化,该帮助程序为具有固定参数集的新 AsyncSession
对象提供了一个工厂,此处包括将其与针对特定数据库 URL 的 AsyncEngine
相关联。然后它被传递给其他方法,它可以在 Python 异步上下文管理器中使用(即 async with:
语句),以便它自动关闭
区块的结尾;这相当于调用
AsyncSession.close()
方法。
将 AsyncSession 与并发任务一起使用¶
AsyncSession
对象是一个可变的有状态对象
,它表示正在进行的单个有状态数据库事务。通过 asyncio 使用并发任务,例如使用 asyncio.gather()
等 API,应该为每个个体使用单独的AsyncSession
任务。
请参阅 Session 是否线程安全? 在并发任务中共享 AsyncSession 是否安全?,了解 Session
和 AsyncSession
的一般说明,以及它们应如何与并发工作负载一起使用。
使用 AsyncSession 时防止隐式 IO¶
使用传统的 asyncio 时,应用程序需要避免可能发生 IO-on-attribute 访问的任何点。下面列出了可用于帮助实现此目的的技术,其中许多技术在前面的示例中进行了说明。
延迟加载关系、延迟列或表达式或在过期方案中访问的属性可以利用AsyncAttrs
混合。当这个 mixin 被添加到一个特定的类中,或者更普遍地被添加到 DeclarativeBase
超类中时,提供了一个访问器AsyncAttrs.awaitable_attrs
它将任何属性作为 awaitable 提供:from __future__ import annotations from typing import List from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" # ... rest of mapping ... bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" # ... rest of mapping ...
在不使用预先加载时,在新加载的A
实例上访问A.bs
集合通常会使用延迟加载,为了成功,通常会向数据库发出 IO,这在 asyncio 下会失败,因为不允许隐式 IO。要在 asyncio 下直接访问此属性而无需任何先前的加载作,可以通过指示 AsyncAttrs.awaitable_attrs 来将该属性作为 awaitable 访问。
前缀:a1 = (await session.scalars(select(A))).one() for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs
mixin 在 internal 方法,该方法也被AsyncSession.run_sync()
方法。
2.0.13 新版功能.
另请参阅
通过使用 SQLAlchemy 2.0 的使用此功能,永远不会从中读取集合,只会读取集合 使用显式 SQL 调用进行查询。 查看示例async_orm_writeonly.py
Asyncio 集成部分中,了解与 asyncio 一起使用的只写集合的示例。
当使用只写集合时,程序的行为很简单,很容易预测集合。但是,缺点是没有任何内置系统可以一次加载许多这些集合,而是需要手动执行。因此,下面的许多项目符号都解决了在 asyncio 中使用传统的 lazy-loaded 关系时的特定技术,这需要更加小心。
如果不使用AsyncAttrs
,则可以使用lazy=“raise”
声明关系,这样默认情况下它们就不会尝试发出 SQL。为了加载集合,将改用预先加载。
最有用的预先加载策略是selectInload()
急切加载器,在前面的示例中使用了它A.bs
,以便在await session.execute()
调用:stmt = select(A).options(selectinload(A.bs))
在构造新对象时,集合总是被分配一个默认的 空集合,例如上面示例中的 List:A(bs=[], data="a2")
这允许在刷新A
对象时,上述A
对象上的.bs
集合存在且可读;否则,当A
被刷新时,.bs
将被卸载,并在访问时引发错误。AsyncSession
是使用Session.expire_on_commit
设置为 False,以便我们可以访问 属性AsyncSession.commit(),
就像我们访问属性的末尾的那一行一样:# create AsyncSession with expire_on_commit=False async_session = AsyncSession(engine, expire_on_commit=False) # sessionmaker version async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: result = await session.execute(select(A).order_by(A.id)) a1 = result.scalars().first() # commit would normally expire all attributes await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data)
其他指南包括:
应该避免使用像AsyncSession.expire()
这样的方法,而应该使用AsyncSession.refresh()
的刷新;如果绝对需要过期。通常不需要过期,因为Session.expire_on_commit
使用 asyncio 时,通常应设置为False
。
延迟加载的关系可以在 asyncio 下使用AsyncSession.refresh()
中,如果所需的属性名称 显式传递给Session.refresh.attribute_names
,例如:# assume a_obj is an A that has lazy loaded A.bs collection a_obj = await async_session.get(A, [1]) # force the collection to load by naming it in attribute_names await async_session.refresh(a_obj, ["bs"]) # collection is present print(f"bs collection: {a_obj.bs}")
当然,最好提前使用 Eager Loading,以便已经设置集合而无需延迟加载。
2.0.4 版本中的新功能: 添加了对AsyncSession.refresh()
和底层Session.refresh()
方法强制延迟加载关系 加载,如果它们在Session.refresh.attribute_names
参数。在以前的版本中,即使在参数中命名,也会以静默方式跳过该关系。
避免使用 Cascades 中记录的all
cascade 选项 赞成明确列出所需的 Cascade 特征。 这all
cascade 选项意味着 refresh-expire 设置,这意味着AsyncSession.refresh()
方法将 使相关对象的属性过期,但不必刷新这些 相关对象(假设未在relationship()的 Deal()
中,使它们处于过期状态。
应该为deferred()
使用适当的 loader 选项 列(如果有使用)以及relationship()
的列 如上所述的构造。 有关延迟列加载的背景信息,请参阅使用 Column Deferral 限制列加载。
中描述的 “dynamic” relationship loader 策略在 默认情况下,动态关系加载器与 asyncio 方法不兼容。 只有在AsyncSession.run_sync()
方法,如 在 asyncio 下运行同步方法和函数,或者使用其.statement
属性来获取普通 select:user = await session.get(User, 42) addresses = (await session.scalars(user.addresses.statement)).all() stmt = user.addresses.statement.where(Address.email_address.startswith("patrick")) addresses_filter = (await session.scalars(stmt)).all()
SQLAlchemy 版本 2.0 中引入的只写技术与 asyncio 完全兼容,应该首选。
另请参阅
“Dynamic” 关系加载器被 “Write Only” 取代 - 关于迁移到 2.0 样式的说明
如果将 asyncio 与不支持 RETURNING 的数据库一起使用,例如 MySQL 8 中,服务器默认值(例如生成的时间戳)不会为 在新刷新的对象上可用,除非Mapper.eager_defaults
选项。在 SQLAlchemy 2.0 中,此行为会自动应用于 PostgreSQL、SQLite 和 MariaDB 等后端,这些后端在 INSERTed 行时使用 RETURNING 获取新值。
在 asyncio 下运行同步方法和函数¶
深度炼金术
这种方法本质上是公开了 SQLAlchemy 首先能够提供 asyncio 接口的机制。虽然这样做没有技术问题,但总的来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些中心理念,即任何可能导致 IO 被调用的编程语句都必须有一个 await
调用,以免程序没有明确地明确说明 IO 可能发生的每一行。这种方法并没有改变这个一般的想法,只是它允许在函数调用的范围内将一系列同步 IO 指令从此规则中免除,基本上捆绑到一个 awaitable 中。
作为在 asyncio 事件循环中集成传统 SQLAlchemy“延迟加载”的另一种方法,一种称为
提供了 AsyncSession.run_sync()
,它将在 greenlet 中运行任何 Python 函数,其中传统的同步编程概念在到达数据库驱动程序时将被转换为使用 await
。这里的一个假设方法是面向 asyncio 的应用程序可以将与数据库相关的方法打包成使用 AsyncSession.run_sync()
调用的函数。
更改上面的例子,如果我们没有使用 selectinload()
对于 A.bs
集合,我们可以在一个单独的函数中完成对这些属性访问的处理:
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
def fetch_and_update_objects(session):
"""run traditional sync-style ORM code in a function that will be
invoked within an awaitable.
"""
# the session object here is a traditional ORM Session.
# all features are available here including legacy Query use.
stmt = select(A)
result = session.execute(stmt)
for a1 in result.scalars():
print(a1)
# lazy loads
for b1 in a1.bs:
print(b1)
# legacy Query use
a1 = session.query(A).order_by(A.id).first()
a1.data = "new data"
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
async with session.begin():
session.add_all(
[
A(bs=[B(), B()], data="a1"),
A(bs=[B()], data="a2"),
A(bs=[B(), B()], data="a3"),
]
)
await session.run_sync(fetch_and_update_objects)
await session.commit()
# for AsyncEngine created in function scope, close and
# clean-up pooled connections
await engine.dispose()
asyncio.run(async_main())
上述在 “sync” 运行程序中运行某些函数的方法与在基于事件的编程库(如 gevent
)上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。区别如下:
与使用gevent
时不同,我们可以继续使用标准的 Python asyncio 事件循环或任何自定义事件循环,而无需集成到gevent
事件循环中。
没有任何 “monkeypatching”。上面的示例使用了真正的 asyncio 驱动程序,底层 SQLAlchemy 连接池也使用了 Python 内置的asyncio。
池化连接排队。
该程序可以在 async/await 代码和包含使用 sync 代码的函数之间自由切换,而几乎没有性能损失。没有使用“线程执行程序”或任何其他 Waiter 或同步。
底层网络驱动程序也使用纯 Python asyncio 概念,没有像gevent
和eventlet
这样的第三方网络库 提供正在使用中。
通过 asyncio 扩展使用事件¶
SQLAlchemy 事件系统不是由 asyncio 扩展直接公开的,这意味着还没有 SQLAlchemy 事件处理程序的“异步”版本。
但是,由于 asyncio 扩展围绕着通常的同步 SQLAlchemy API,因此常规的“同步”样式事件处理程序可以免费使用,就像不使用 asyncio 时一样。
如下所述,在给定面向 asyncio 的 API 的情况下,有两种当前策略可以注册事件:
事件可以在实例级别注册(例如,特定的AsyncEngine
实例),方法是将事件与sync
属性。例如,要针对AsyncEngine
实例,请使用其AsyncEngine.sync_engine
属性设置为 target。目标包括:
要在类级别注册事件,针对相同类型的所有实例(例如所有AsyncSession
实例),请使用相应的 sync-style 类。例如,要注册SessionEvents.before_commit()
事件对AsyncSession
类中,使用Session
类作为目标。
要在sessionmaker
级别注册,请将显式 带有async_sessionmaker
的sessionMaker
,并将async_sessionmaker.sync_session_class
事件与SessionMaker
关联。
当在 asyncio 上下文中的事件处理程序中工作时,像 Connection
这样的对象继续以它们通常的 “同步” 方式工作,而不需要 await
或 async
使用;当 asyncio 数据库适配器最终收到消息时,调用样式将透明地调整回 asyncio 调用样式。对于传递 DBAPI 级别连接的事件,例如 PoolEvents.connect(),
该对象是符合 pep-249 的“连接”对象,它将使同步样式的调用适应 asyncio 驱动程序。
使用异步引擎 / 会话 / Sessionmakers 的事件监听器示例¶
与面向异步的 API 构造关联的同步样式事件处理程序的一些示例如下所示:
AsyncEngine 上的核心事件
在此示例中,我们将访问AsyncEngine.sync_engine
属性作为
ConnectionEvents
和PoolEvents
:import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") # connect event on instance of Engine @event.listens_for(engine.sync_engine, "connect") def my_on_connect(dbapi_con, connection_record): print("New DBAPI connection:", dbapi_con) cursor = dbapi_con.cursor() # sync style API use for adapted DBAPI connection / cursor cursor.execute("select 'execute from event'") print(cursor.fetchone()[0]) # before_execute event on all Engine instances @event.listens_for(Engine, "before_execute") def my_before_execute( conn, clauseelement, multiparams, params, execution_options, ): print("before execute!") async def go(): async with engine.connect() as conn: await conn.execute(text("select 1")) await engine.dispose() asyncio.run(go())
输出:New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>> execute from event before execute!
AsyncSession 上的 ORM 事件
在此示例中,我们将AsyncSession.sync_session
作为SessionEvents
的目标进行访问:import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") session = AsyncSession(engine) # before_commit event on instance of Session @event.listens_for(session.sync_session, "before_commit") def my_before_commit(session): print("before commit!") # sync style API use on Session connection = session.connection() # sync style API use on Connection result = connection.execute(text("select 'execute from event'")) print(result.first()) # after_commit event on all Session instances @event.listens_for(Session, "after_commit") def my_after_commit(session): print("after commit!") async def go(): await session.execute(text("select 1")) await session.commit() await session.close() await engine.dispose() asyncio.run(go())
输出:before commit! execute from event after commit!
async_sessionmaker 上的 ORM 事件
对于此用例,我们将sessionmaker
作为事件目标,然后使用参数async_sessionmaker.sync_session_class
将其分配给async_sessionmaker
:import asyncio from sqlalchemy import event from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import sessionmaker sync_maker = sessionmaker() maker = async_sessionmaker(sync_session_class=sync_maker) @event.listens_for(sync_maker, "before_commit") def before_commit(session): print("before commit") async def main(): async_session = maker() await async_session.commit() asyncio.run(main())
输出:before commit
在连接池和其他事件中使用仅限 awaitable-only 的驱动程序方法¶
如上一节所述,事件处理程序(例如面向 PoolEvents
事件处理程序的事件处理程序)接收同步样式的“DBAPI”连接,该连接是由 SQLAlchemy asyncio 方言提供的包装器对象,用于将底层 asyncio“驱动程序”连接调整为 SQLAlchemy 内部可以使用的连接。当此类事件处理程序的用户定义实现需要直接使用最终的 “driver” 连接时,会出现一个特殊的用例,在该驱动程序连接上使用仅限 awaitable 的方法。一个这样的例子是 asyncpg 驱动程序提供的 .set_type_codec()
方法。
为了适应此用例,SQLAlchemy 的 AdaptedConnection
class 提供了一个方法 AdaptedConnection.run_async()
,该方法允许在事件处理程序或其他 SQLAlchemy 内部的 “同步” 上下文中调用可等待的函数。此方法直接类似于 AsyncConnection.run_sync()
方法,该方法允许 sync-style 方法在 async 下运行。
应该向 AdaptedConnection.run_async()
传递一个函数,该函数将接受最内层的 “driver” 连接作为单个参数,并返回一个将由 AdaptedConnection.run_async() 调用的 awaitable
方法。 给定的函数本身不需要声明为 async
;它是 Python lambda:
是完全可以的,因为返回 awaitable 值将在返回后调用:
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
dbapi_connection.run_async(
lambda connection: connection.set_type_codec(
"MyCustomType",
encoder,
decoder, # ...
)
)
在上面,传递给 register_custom_types
事件处理程序的对象是 AdaptedConnection
的实例,它为底层仅异步驱动程序级连接对象提供类似于 DBAPI 的接口。然后,AdaptedConnection.run_async()
方法提供对可等待环境的访问,在该环境中,可以作用于基础驱动程序级别连接。
在 1.4.30 版本加入.
使用多个 asyncio 事件循环¶
使用多个事件循环的应用程序,例如在将 asyncio 与多线程相结合的不常见情况下,在使用默认池实现时,不应与不同的事件循环共享同一个 AsyncEngine
。
如果 AsyncEngine
从一个事件循环传递到另一个事件循环,则应先调用 AsyncEngine.dispose()
方法,然后再将其用于新的事件循环。否则可能会导致 RuntimeError
沿着
Task <Task pending ...> got Future attached to a different loop
如果必须在不同的循环之间共享相同的引擎,则应将其配置为使用 NullPool
禁用池,从而防止引擎多次使用任何连接:
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/dbname",
poolclass=NullPool,
)
使用 asyncio 作用域的会话¶
线程 SQLAlchemy 中使用的“作用域会话”模式与
scoped_session
对象在 asyncio 中也可用,使用名为 async_scoped_session
的改编版本。
提示
SQLAlchemy 通常不推荐 “scoped” 模式
对于新的开发,因为它依赖于可变的全局状态,该状态也必须
在线程或任务中的工作完成时显式拆除。
特别是在使用 asyncio 时,最好将
AsyncSession
直接传递给需要它的可等待函数。
使用 async_scoped_session
时,因为没有 “thread-local”
概念,则必须将 “scopefunc” 参数提供给
构造函数。以下示例说明了如何使用
asyncio.current_task()
函数来实现此目的:
from asyncio import current_task
from sqlalchemy.ext.asyncio import (
async_scoped_session,
async_sessionmaker,
)
async_session_factory = async_sessionmaker(
some_async_engine,
expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
async_session_factory,
scopefunc=current_task,
)
some_async_session = AsyncScopedSession()
警告
async_scoped_session
使用的 “scopefunc”
在任务中调用任意次数,每次访问底层 AsyncSession
时调用一次。因此,该函数应该是幂等的轻量级的,并且不应该尝试创建或更改任何状态,例如建立回调等。
警告
对作用域中的 “key” 使用 current_task()
需要从最外层的可等待对象中调用 async_scoped_session.remove()
方法,以确保在任务完成时从注册表中删除该 key,否则任务句柄和 AsyncSession
将保留在内存中,本质上会造成内存泄漏。请参阅以下示例,其中说明了 async_scoped_session.remove()
的正确用法。
async_scoped_session
包括代理
的行为类似于 scoped_session
,这意味着它可以直接被视为 AsyncSession
,请记住,通常的 await
关键字是必需的,包括
async_scoped_session.remove()
方法:
async def some_function(some_async_session, some_object):
# use the AsyncSession directly
some_async_session.add(some_object)
# use the AsyncSession via the context-local proxy
await AsyncScopedSession.commit()
# "remove" the current proxied AsyncSession for the local context
await AsyncScopedSession.remove()
在 1.4.19 版本加入.
使用 Inspector 检查 schema 对象¶
SQLAlchemy 尚未提供 asyncio 版本的
Inspector
(在 Fine Grained Reflection with Inspector 中引入),但是可以通过利用 AsyncConnection.run_sync()
的
AsyncConnection 的 AsyncConnection
中:
import asyncio
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")
def use_inspector(conn):
inspector = inspect(conn)
# use the inspector
print(inspector.get_view_names())
# return any value to the caller
return inspector.get_table_names()
async def async_main():
async with engine.connect() as conn:
tables = await conn.run_sync(use_inspector)
asyncio.run(async_main())
引擎 API 文档¶
对象名称 |
描述 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
-
函数 sqlalchemy.ext.asyncio 中。create_async_engine(url:strURL, **kw: Any)AsyncEngine ¶
创建新的异步引擎实例。
传递给create_async_engine()
的参数与传递给create_engine()
函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg。
在 1.4 版本加入.
参数async_creator¶ –
一个 async callable,它返回一个驱动程序级的 asyncio 连接。如果给定,则该函数不应采用任何参数,并从底层 asyncio 数据库驱动程序返回新的 asyncio 连接;该连接将被包装在适当的结构中,以便与AsyncEngine
一起使用。请注意,URL 中指定的参数在此处不适用,creator 函数应使用自己的连接参数。
此参数是 asyncio 等效的create_engine.creator
参数的create_engine()
函数。
2.0.16 新版功能.
-
函数 sqlalchemy.ext.asyncio 中。async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any)AsyncEngine ¶
使用配置字典创建新的 AsyncEngine 实例。
此函数类似于 SQLAlchemy Core 中的engine_from_config()
函数,不同之处在于请求的方言必须是与 asyncio 兼容的方言,例如 asyncpg。该函数的参数签名与engine_from_config()
的参数签名相同。
在 1.4.29 版本加入.
-
函数 sqlalchemy.ext.asyncio 中。create_async_pool_from_url(url:strURL, **kwargs: Any)矿池 ¶
创建新的异步引擎实例。
传递给create_async_pool_from_url()
的参数与传递给create_pool_from_url()
函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg。
在 2.0.10 版本加入.
-
类 sqlalchemy.ext.asyncio 中。异步引擎¶ Engine
的 asyncio 代理。AsyncEngine
是使用create_async_engine()
函数:from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
在 1.4 版本加入.
成员
begin(), clear_compiled_cache(), connect(), 方言, dispose(), 驱动程序, echo, 引擎, execution_options(), get_execution_options(), 名称, 池, raw_connection(), sync_engine,update_execution_options(), URL
类签名
类sqlalchemy.ext.asyncio.AsyncEngine
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.AsyncConnectable
)-
方法sqlalchemy.ext.asyncio.AsyncEngine.
begin()AsyncIterator[AsyncConnection] ¶
返回一个上下文管理器,输入该上下文管理器后将传递一个AsyncConnection
替换为AsyncTransaction
已建立。
例如:async with async_engine.begin() as conn: await conn.execute( text("insert into table (x, y, z) values (1, 2, 3)") ) await conn.execute(text("my_special_procedure(5)"))
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
clear_compiled_cache()无 ¶
清除与方言关联的已编译缓存。
代表AsyncEngine
类代理Engine
类。
这仅适用于通过create_engine.query_cache_size
parameter 建立的内置缓存。 它不会影响通过Connection.execution_options.compiled_cache
参数。
在 1.4 版本加入.
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
connect()AsyncConnection ¶
返回AsyncConnection
对象。
当AsyncConnection
作为异步上下文管理器输入时,它将从底层连接池中获取数据库连接:async with async_engine.connect() as conn: result = await conn.execute(select(user_table))
AsyncConnection
也可以通过调用其AsyncConnection.start()
在上下文管理器之外启动 方法。
-
attribute dialect(属性sqlalchemy.ext.asyncio.AsyncEngine.
方言)¶
代表AsyncEngine
类的Engine.dialect
属性的代理。
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
async dispose(close: bool = True)None ¶
释放 this 使用的连接池AsyncEngine 的 AsyncEngine
中。
参数
关闭¶ –
如果保留为默认值True
,则具有完全关闭当前签入的所有 数据库连接。 仍处于签出状态的连接 不会关闭,但它们将不再与此引擎
相关联, 因此,当它们单独关闭时,最终 与他们关联的游泳池
将被垃圾回收,如果在入住时尚未关闭,它们将被完全关闭。
如果设置为False
,则取消引用前一个连接池,否则不会以任何方式触及。
另请参阅
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
驱动¶ 方言
的驱动程序名称 由此引擎
使用。
代表AsyncEngine
类代理Engine
类。
-
attributesqlalchemy.ext.asyncio.AsyncEngine.
echo 属性¶
如果为 True
,则为此元素启用日志输出。
代表AsyncEngine
类代理Engine
类。
这具有为此元素的类和对象引用的命名空间设置 Python 日志记录级别的效果。布尔值True
表示将为记录器设置 loglevellogging.INFO
,而字符串值debug
会将 loglevel 设置为伐木。DEBUG的 DEBUG
中。
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
引擎¶
返回此Engine
。
代表AsyncEngine
类代理Engine
类。
用于接受Connection
/Engine
对象。
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
execution_options(**opt: Any)AsyncEngine ¶
返回一个新的AsyncEngine
,它将提供AsyncConnection
对象。
从Engine.execution_options()
代理。有关详细信息,请参阅该方法。
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
get_execution_options()_ExecuteOptions ¶
获取将在执行过程中生效的非 SQL 选项。
代表AsyncEngine
类代理Engine
类。
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
名称¶ 方言
的字符串名称 由此引擎
使用。
代表AsyncEngine
类代理Engine
类。
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
池¶
代表AsyncEngine
类的Engine.pool
属性的代理。
-
方法sqlalchemy.ext.asyncio.AsyncEngine.
async raw_connection()PoolProxiedConnection ¶
从连接池中返回 “原始” DBAPI 连接。
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
sync_engine: 引擎¶
引用 sync-styleEngine
thisAsyncEngine
代理请求。
此实例可用作事件目标。
另请参阅
-
methodsqlalchemy.ext.asyncio.AsyncEngine.
update_execution_options(**opt: Any)无 ¶
更新此Engine
的默认 execution_options 字典。
代表AsyncEngine
类代理Engine
类。
opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过execution_options
参数发送到create_engine()
。
-
属性sqlalchemy.ext.asyncio.AsyncEngine.
url¶
代表AsyncEngine
类的Engine.url
属性的代理。
-
-
类 sqlalchemy.ext.asyncio 中。异步连接¶
Connection 的 asyncio代理。
AsyncConnection
是使用异步引擎.connect()
AsyncEngine
的方法:from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") async with engine.connect() as conn: result = await conn.execute(select(table))
在 1.4 版本加入.
成员
aclose()、begin()、begin_nested()、close()、closed、commit()、connection、default_isolation_level、dialect、exec_driver_sql()、execute()、execution_options()、get_nested_transaction()、 get_raw_connection(), get_transaction(), in_nested_transaction(), in_transaction(), info, invalidate(), 无效, 回滚(), run_sync(), 标量(), 标量(), start(), stream(), stream_scalars()、sync_connection、sync_engine
类签名
类sqlalchemy.ext.asyncio.AsyncConnection
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.base.StartableContext
,sqlalchemy.ext.asyncio.AsyncConnectable
)-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async aclose()None ¶ AsyncConnection.close()
的同义词。AsyncConnection.aclose()
名称专门用于支持 Python 标准库@contextlib.aclosing
Context Manager 函数。
2.0.20 版本的新Function。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
begin()AsyncTransaction ¶
在 autostart 发生之前开始事务。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
begin_nested()AsyncTransaction ¶
开始嵌套事务并返回事务句柄。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
async close()None ¶
关闭此AsyncConnection
。
如果有一个事务,这还具有回滚事务的效果。
-
Attributesqlalchemy.ext.asyncio.AsyncConnection.
closed 属性¶
如果此连接已关闭,则返回 True。
代表AsyncConnection
类为Connection
类代理。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async commit()None ¶
提交当前正在进行的事务。
如果已启动事务,则此方法提交当前事务。如果未启动事务,则该方法无效,假设连接处于非无效状态。
事务会自动在Connection
上开始 每当首次执行语句时,或者当Connection.begin()
方法。
-
属性sqlalchemy.ext.asyncio.AsyncConnection.
连接¶
未为 async 实现;叫AsyncConnection.get_raw_connection()
。
-
属性sqlalchemy.ext.asyncio.AsyncConnection.
default_isolation_level¶
与 使用中的方言
。
代表AsyncConnection
类为Connection
类代理。
此值独立于Connection.execution_options.isolation_level
和Engine.execution_options.isolation_level
执行选项,并由Dialect
在创建第一个连接时确定,方法是在发出任何其他命令之前对当前隔离级别的数据库执行 SQL 查询。
调用此访问器不会调用任何新的 SQL 查询。
-
attribute dialect(属性sqlalchemy.ext.asyncio.AsyncConnection.
方言)¶
代表AsyncConnection
类的Connection.dialect
属性的代理。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async exec_driver_sql(statement: str, parameters:_DBAPIAnyExecuteParamsNone=None, execution_options:CoreExecuteOptionsParameterNone=None)CursorResult[Any] ¶
执行驱动程序级 SQL 字符串并返回缓冲结果
。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
async execute(statement: Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)CursorResult[Any] ¶
执行 SQL 语句结构并返回缓冲的结果
。
参数
对象¶ –
要执行的语句。这始终是一个对象,它同时位于ClauseElement
和可执行
层次结构,包括:DDL
和继承自ExecutableDDLElement
parameters¶ —— 将绑定到语句中的参数。这可以是参数名称到值的字典,也可以是字典的可变序列(例如列表)。当传递字典列表时,底层语句执行将使用 DBAPIcursor.executemany()
方法。当传递单个字典时,DBAPIcursor.execute()
方法。
execution_options¶ – 可选的执行选项字典,将与语句执行相关联。此词典可以提供 接受的选项的子集Connection.execution_options()
。
结果Result
对象。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async execution_options(**opt: Any)AsyncConnection ¶
为连接设置非 SQL 选项,这些选项在执行过程中生效。
这将返回此AsyncConnection
对象,并添加了新选项。
有关此方法的完整详细信息,请参阅Connection.execution_options()
。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
get_nested_transaction()→AsyncTransactionNone¶
返回表示当前嵌套 (savepoint) 事务(如果有)的AsyncTransaction
。
这利用了底层同步连接的Connection.get_nested_transaction()
方法获取当前Transaction
,然后在新的AsyncTransaction
对象。
1.4.0b2 版本的新Function。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async get_raw_connection()PoolProxiedConnection ¶
返回此正在使用的共用 DBAPI 级连接AsyncConnection 的 AsyncConnection
中。
这是一个 SQLAlchemy 连接池代理连接 该 API 具有属性_ConnectionFairy.driver_connection
,指的是 实际驾驶员连接。其_ConnectionFairy.dbapi_connection
而是引用使驱动程序连接适应 DBAPI 协议的AdaptedConnection
实例。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
get_transaction()→AsyncTransactionNone¶
返回表示当前事务的AsyncTransaction
(如果有)。
这利用了底层同步连接的Connection.get_transaction()
方法获取当前Transaction
,然后在新的AsyncTransaction
对象。
1.4.0b2 版本的新Function。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
in_nested_transaction()bool ¶
如果事务正在进行,则返回 True。
1.4.0b2 版本的新Function。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
in_transaction()bool ¶
如果事务正在进行,则返回 True。
-
属性sqlalchemy.ext.asyncio.AsyncConnection.
信息¶
返回底层Connection
的Connection.info
字典。
此字典可自由写入,以便用户定义的状态与数据库连接相关联。
仅当AsyncConnection
当前已连接时,此属性才可用。如果AsyncConnection.closed
属性为True
,则访问此属性将引发ResourceClosedError 的 ResourceClosedError
错误。
1.4.0b2 版本的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
async invalidate(exception:BaseExceptionNone=None)None ¶
使与此Connection
关联的基础 DBAPI 连接失效。
有关此方法的完整详细信息,请参阅方法Connection.invalidate()
。
-
attribute invalidated(属性sqlalchemy.ext.asyncio.AsyncConnection.
无效)¶
如果此连接无效,则返回 True。
代表AsyncConnection
类为Connection
类代理。
但是,这并不表示连接是否在池级别失效
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async rollback()None ¶
回滚当前正在进行的事务。
如果已启动事务,则此方法回滚当前事务。如果未启动事务,则该方法无效。如果事务已启动并且连接处于无效状态,则使用此方法清除事务。
事务会自动在Connection
上开始 每当首次执行语句时,或者当Connection.begin()
方法。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async run_sync(fn: ~typing.Callable[[~typing.连接[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P)_T ¶
调用给定的 synchronous (i. not async) 可调用对象,传递一个 synchronous 样式的Connection
作为第一个参数。
此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。
例如:def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str: """A synchronous function that does not require awaiting :param conn: a Core SQLAlchemy Connection, used synchronously :return: an optional return value is supported """ conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2)) return "success" async def do_something_async(async_engine: AsyncEngine) -> None: """an async function that uses awaiting""" async with async_engine.begin() as async_conn: # run do_something_with_core() with a sync-style # Connection, proxied into an awaitable return_code = await async_conn.run_sync( do_something_with_core, 5, "strval" ) print(return_code)
此方法通过在专门检测的 greenlet 中运行给定的可调用对象来维护 asyncio 事件循环,直到数据库连接。AsyncConnection.run_sync()
最基本的用途是调用MetaData.create_all()
等方法,给定需要提供给
MetaData.create_all()
作为连接
对象:# run metadata.create_all(conn) with a sync-style Connection, # proxied into an awaitable with async_engine.begin() as conn: await conn.run_sync(metadata.create_all)
注意
提供的可调用对象在 asyncio 事件循环中内联调用,并将在传统的 IO 调用上阻塞。此可调用对象中的 IO 应仅调用 SQLAlchemy 的 asyncio 数据库 API,该 API 将适当适应 greenlet 上下文。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
async scalar(statement: Executable, parameters:_CoreSingleExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)→ Any(语句: 可执行, parameters:=None)
执行 SQL 语句构造并返回标量对象。
此方法是调用Result.scalar()
方法。Connection.execute()
方法。参数是等效的。
结果
一个标量 Python 值,表示返回的第一行的第一列。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
async scalars(statement: Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)ScalarResult[Any] ¶
执行 SQL 语句构造并返回标量对象。
此方法是调用Result.scalars()
方法。Connection.execute()
方法。参数是等效的。
结果ScalarResult
对象。
在 1.4.24 版本加入.
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
async start(is_ctxmanager: bool = False)AsyncConnection ¶
在使用 Pythonwith:
块之外启动此AsyncConnection
对象的上下文。
-
methodsqlalchemy.ext.asyncio.AsyncConnection.
stream(statement: Executable, parameters:_CoreAnyExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)AsyncIterator[AsyncResult[Any]] ¶
执行一条语句并返回一个可等待的,从而产生一个AsyncResult
对象。
例如:result = await conn.stream(stmt) async for row in result: print(f"{row}")
AsyncConnection.stream()
方法支持对AsyncResult
对象,如下所示:async with conn.stream(stmt) as result: async for row in result: print(f"{row}")
在上述模式中,AsyncResult.close()
方法是无条件调用的,即使迭代器被异常抛出中断。但是,上下文管理器使用仍然是可选的,并且可以使用 fn() 在 async 中
调用该函数:或await fn()
样式。
2.0.0b3 版本中的新功能:添加了上下文管理器支持
结果
一个 awaitable 对象,它将产生一个AsyncResult
对象。
-
方法sqlalchemy.ext.asyncio.AsyncConnection.
stream_scalars(statement: Executable, parameters:_CoreSingleExecuteParamsNone=None, *, execution_options:CoreExecuteOptionsParameterNone=None)AsyncIterator[AsyncScalarResult[任意]] ¶
执行一条语句并返回一个可等待的,从而产生一个AsyncScalarResult
对象。
例如:result = await conn.stream_scalars(stmt) async for scalar in result: print(f"{scalar}")
此方法是调用AsyncResult.scalars()
方法。Connection.stream()
方法。参数是等效的。
这AsyncConnection.stream_scalars()
方法支持对AsyncScalarResult
对象,如下所示:async with conn.stream_scalars(stmt) as result: async for scalar in result: print(f"{scalar}")
在上述模式中,即使迭代器被异常抛出中断,也会无条件地调用AsyncScalarResult.close()
方法。但是,上下文管理器使用仍然是可选的,并且可以使用 fn() 在 async 中
调用该函数:或await fn()
样式。
2.0.0b3 版本中的新功能:添加了上下文管理器支持
结果
一个 awaitable 对象,它将产生一个AsyncScalarResult
对象。
在 1.4.24 版本加入.
-
属性sqlalchemy.ext.asyncio.AsyncConnection.
sync_connection:ConnectionNone¶
对 sync-styleConnection
this 的引用AsyncConnection
代理请求。
此实例可用作事件目标。
另请参阅
-
属性sqlalchemy.ext.asyncio.AsyncConnection.
sync_engine: 引擎¶
引用 sync-styleEngine
thisAsyncConnection
通过其底层连接
。
此实例可用作事件目标。
另请参阅
-
-
类 sqlalchemy.ext.asyncio 中。异步交易¶
Transaction 的 asyncio代理。
类签名
类sqlalchemy.ext.asyncio.AsyncTransaction
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.base.StartableContext
)-
methodsqlalchemy.ext.asyncio.AsyncTransaction.
async close()None ¶
关闭此AsyncTransaction
。
如果此事务是 begin/commit 嵌套中的基本事务,则该事务将 rollback() 。否则,该方法返回。
这用于取消 Transaction 而不影响封闭事务的范围。
-
方法sqlalchemy.ext.asyncio.AsyncTransaction.
async commit()None ¶
提交此AsyncTransaction
。
-
方法sqlalchemy.ext.asyncio.AsyncTransaction.
async rollback()None ¶
回滚此AsyncTransaction
。
-
方法sqlalchemy.ext.asyncio.AsyncTransaction.
async start(is_ctxmanager: bool = False)AsyncTransaction ¶
在使用 Python的 with:
块之外启动此AsyncTransaction
对象的上下文。
-
结果集 API 文档¶
AsyncResult
对象是
Result
对象。 仅当使用
AsyncConnection.stream()
或 AsyncSession.stream()
方法,该方法返回位于活动数据库之上的 Result 对象
光标。
对象名称 |
描述 |
---|---|
|
|
|
|
|
|
|
-
类 sqlalchemy.ext.asyncio 中。异步结果¶
一个围绕Result
对象的 asyncio 包装器。AsyncResult
仅适用于 使用服务器端游标。 它仅从AsyncConnection.stream()
和AsyncSession.stream()
方法。
注意
与Result
一样,此对象用于AsyncSession.execute()
返回的 ORM 结果,它可以单独或在类似 Tuples 的行中生成 ORM 映射对象的实例。请注意,这些结果对象不会像旧版Query
对象那样自动删除重复的实例或行。对于 Python 中实例或行的重复数据删除,请使用AsyncResult.unique()
修饰符方法。
在 1.4 版本加入.
成员
all(), close(), closed, columns(), fetchall(), fetchmany(), fetchone(), first(), freeze(), keys(), mappings(), one(), one_or_none(), partitions(), scalar()、 scalar_one()、 scalar_one_or_none()、标量 ()、 t、元组 () 、 unique()、 yield_per()
类签名
类sqlalchemy.ext.asyncio.AsyncResult
(sqlalchemy.engine._WithKeys
,sqlalchemy.ext.asyncio.AsyncCommon
)-
methodsqlalchemy.ext.asyncio.AsyncResult.
async all()Sequence[Row[_TP]] ¶
返回列表中的所有行。
调用后关闭结果集。后续调用将返回一个空列表。
结果Row
对象的列表。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async close()None ¶
继承自AsyncCommon
的AsyncCommon.close()
方法
关闭此结果。
-
Attributesqlalchemy.ext.asyncio.AsyncResult.
closed 属性¶
继承自AsyncCommon
的AsyncCommon.closed
属性
代理基础 result 对象的 .closed 属性(如果有),否则会引发AttributeError
。
2.0.0b3 版本的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
columns(*col_expressions: _KeyIndexType)Self ¶
建立应在每行中返回的列。
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的Result.columns()
。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
async fetchall()Sequence[Row[_TP]] ¶ AsyncResult.all()
方法的同义词。
2.0 版的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async fetchmany(size:intNone=None)sequence[Row[_TP]]¶
获取许多行。
当所有行都用完时,返回一个空列表。
提供此方法是为了向后兼容 SQLAlchemy 1.x.x。
要按组获取行,请使用AsyncResult.partitions()
方法。
结果Row
对象的列表。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async fetchone()→Row[_TP]None¶
获取一行。
当所有行都用完时,返回 None。
提供此方法是为了向后兼容 SQLAlchemy 1.x.x。
要仅获取结果的第一行,请使用AsyncResult.first()
方法。要遍历所有行,请直接遍历AsyncResult
对象。
结果Row
对象(如果未应用过滤器),如果没有剩余行,则为 None
。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async first()→Row[_TP]None¶
获取第一行,如果不存在行,则为 None
。
关闭结果集并丢弃剩余行。
注意
默认情况下,此方法返回一行,例如 tuple。 要只返回一个标量值,即第一个 列,请使用AsyncResult.scalar()
方法,或者将AsyncResult.scalars()
和AsyncResult.first() 的 AsyncResult.first()
中。
此外,与传统 ORM 的行为相比Query.first()
方法,则对 调用的 SQL 查询来生成此异步结果
;对于在生成行之前在内存中缓冲结果的 DBAPI 驱动程序,所有行都将发送到 Python 进程,并且除第一行之外的所有行都将被丢弃。
结果一个 Row
对象,如果没有剩余行,则为 None。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
async freeze()FrozenResult[_TP] ¶
返回一个可调用对象,该对象将生成此AsyncResult
调用时。
返回的可调用对象是FrozenResult 的 Froz。
这用于结果集缓存。当结果未使用时,必须在结果上调用该方法,并且调用该方法将完全使用结果。当FrozenResult
从缓存中检索,则可以调用任意次数,其中 它每次都会针对其存储的行集生成一个新的Result
对象。
另请参阅
Re-Executing Statements - 在 ORM 中实现结果集缓存的示例用法。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
keys()RMKeyView ¶
继承自sqlalchemy.engine._WithKeys.keys
sqlalchemy.engine._WithKeys
的方法
返回一个可迭代视图,该视图产生将由每个Row
表示的字符串键。
键可以表示 core 语句返回的列的标签或 orm 执行返回的 orm 类的名称。
还可以使用 Python 测试视图的密钥包含in
运算符,该运算符将测试视图中表示的字符串键以及备用键(如 Column 对象)。
在 1.4 版本发生变更: 返回一个键视图对象,而不是一个普通的列表。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
mappings()AsyncMappingResult ¶
将 mappings 筛选器应用于返回的行,返回AsyncMappingResult 的 AsyncMappingResult
中。
应用此过滤器后,fetching rows 将返回RowMapping
对象而不是Row
对象。
结果
引用底层Result
对象的新AsyncMappingResult
筛选对象。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async one()Row[_TP] ¶
只返回一行或引发异常。
如果结果不返回任何行,则引发NoResultFound
,如果返回多行,则引发MultipleResultsFound
。
注意
默认情况下,此方法返回一行,例如 tuple。 要只返回一个标量值,即第一个 列,请使用AsyncResult.scalar_one()
方法,或者 combineAsyncResult.scalars()
和AsyncResult.one() 的 AsyncResult.one()
中。
在 1.4 版本加入.
结果
第一行
。
提升:
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async one_or_none()→Row[_TP]None¶
最多返回一个结果或引发异常。
如果结果没有行,则返回None
。引发MultipleResultsFound
如果返回多行。
在 1.4 版本加入.
结果
第一行
或None
(无行)如果没有可用行。
提升:
-
方法sqlalchemy.ext.asyncio.AsyncResult.
async partitions(size:intNone=None)AsyncIterator[Sequence[Row[_TP]]]¶
遍历给定大小的行的子列表。
返回一个异步迭代器:async def scroll_results(connection): result = await connection.stream(select(users_table)) async for partition in result.partitions(100): print("list of rows: %s" % partition)
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的Result.partitions()
。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
async scalar()Any ¶
获取第一行的第一列,然后关闭结果集。
如果没有要提取的行,则返回None
。
不执行验证来测试是否保留其他行。
调用该方法后,对象完全关闭,例如CursorResult.close()
方法。
结果
Python 标量值,如果没有剩余行,则为 None
。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
async scalar_one()Any ¶
只返回一个标量结果或引发异常。
这相当于先调用AsyncResult.scalars(),
然后再调用AsyncScalarResult.one()。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
async scalar_one_or_none()→AnyNone¶
只返回一个标量结果或None
。
这相当于调用AsyncResult.scalars(),
然后AsyncScalarResult.one_or_none()
调用 .
-
方法sqlalchemy.ext.asyncio.AsyncResult.
scalars(index: _KeyIndexType = 0)AsyncScalarResult[Any] ¶
返回一个AsyncScalarResult
筛选对象,该对象将返回单个元素而不是Row
对象。
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的Result.scalars()。
参数
index¶—— 整数或行键,表示要从每行中获取的列,默认为0
表示第一列。
结果
引用此AsyncResult
对象的新AsyncScalarResult
筛选对象。
-
属性sqlalchemy.ext.asyncio.AsyncResult.
t¶
将 “typed tuple” 键入筛选器应用于返回的行。AsyncResult.t
属性是调用AsyncResult.tuples()
方法的同义词。
2.0 版的新Function。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
tuples()AsyncTupleResult[_TP] ¶
将 “typed tuple” 键入筛选器应用于返回的行。
此方法在运行时返回相同的AsyncResult
对象,但批注为返回AsyncTupleResult
对象,它将向 PEP 484 指示纯类型 返回Tuples
实例而不是 rows。这允许元组解包和__getitem__
Row
访问 对象,对于调用了 本身包括键入信息。
2.0 版的新Function。
结果
键入时的AsyncTupleResult
类型。
-
methodsqlalchemy.ext.asyncio.AsyncResult.
unique(strategy:_UniqueFilterTypeNone=None)Self( 自身 ) ¶
将唯一筛选应用于此返回的对象AsyncResult 的 AsyncResult
中。
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的Result.unique()
。
-
方法sqlalchemy.ext.asyncio.AsyncResult.
yield_per(num: int)Self ¶
继承自FilterResult
的FilterResult.yield_per()
方法
配置行获取策略以一次获取num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的传递。有关使用说明,请参阅该方法的文档。
1.4.40 版本中的新功能: - 添加了FilterResult.yield_per()
,以便该方法可用于所有结果集实现
-
-
类 sqlalchemy.ext.asyncio 中。AsyncScalarResult(异步标量结果)¶
返回标量值而不是Row
值的AsyncResult
的包装器。AsyncScalarResult
对象是通过调用AsyncResult.scalars()
方法。
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的ScalarResult
对象。
在 1.4 版本加入.
成员
all(), close(), closed, fetchall(), fetchmany(), first(), one(), one_or_none(), partitions(), unique(), yield_per()
类签名
类sqlalchemy.ext.asyncio.AsyncScalarResult
(sqlalchemy.ext.asyncio.AsyncCommon
)-
methodsqlalchemy.ext.asyncio.AsyncScalarResult.
async all()序列[_R] ¶
返回列表中的所有标量值。
等效于AsyncResult.all(),
但返回的是标量值,而不是Row
对象。
-
methodsqlalchemy.ext.asyncio.AsyncScalarResult.
async close()None ¶
继承自AsyncCommon
的AsyncCommon.close()
方法
关闭此结果。
-
Attributesqlalchemy.ext.asyncio.AsyncScalarResult.
closed 属性¶
继承自AsyncCommon
的AsyncCommon.closed
属性
代理基础 result 对象的 .closed 属性(如果有),否则会引发AttributeError
。
2.0.0b3 版本的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncScalarResult.
async fetchall()序列[_R] ¶ AsyncScalarResult.all()
方法的同义词。
-
methodsqlalchemy.ext.asyncio.AsyncScalarResult.
async fetchmany(size:intNone=None)Sequence[_R] ¶
获取多个对象。
等效于AsyncResult.fetchmany(),
但返回的是标量值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncScalarResult.
async first()→_RNone¶
获取第一个对象,如果不存在对象,则为None
。
等效于AsyncResult.first(),
但返回的是标量值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncScalarResult.
async one()_R ¶
只返回一个对象或引发异常。
等效于AsyncResult.one(),
但返回的是标量值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncScalarResult.
async one_or_none()→_RNone¶
最多返回一个对象或引发异常。
等效于AsyncResult.one_or_none()
不同之处在于返回标量值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncScalarResult.
async partitions(size:intNone=None)AsyncIterator[Sequence[_R]]¶
遍历给定大小的元素的子列表。
等效于AsyncResult.partitions(),
但返回的是标量值,而不是Row
对象。
-
methodsqlalchemy.ext.asyncio.AsyncScalarResult.
unique(strategy:_UniqueFilterTypeNone=None)Self( 自身 ) ¶
将唯一筛选应用于此返回的对象AsyncScalarResult 的 AsyncScalarResult
中。
有关使用详情,请参阅AsyncResult.unique()。
-
方法sqlalchemy.ext.asyncio.AsyncScalarResult.
yield_per(num: int)Self ¶
继承自FilterResult
的FilterResult.yield_per()
方法
配置行获取策略以一次获取num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的传递。有关使用说明,请参阅该方法的文档。
1.4.40 版本中的新功能: - 添加了FilterResult.yield_per()
,以便该方法可用于所有结果集实现
-
-
类 sqlalchemy.ext.asyncio 中。AsyncMappingResult(异步映射结果)¶ AsyncResult
的包装器,它返回字典值而不是Row
值。AsyncMappingResult
对象是通过调用AsyncResult.mappings()
方法。
有关完整的行为描述,请参阅同步 SQLAlchemy API 中的MappingResult
对象。
在 1.4 版本加入.
成员
all(), close(), closed, columns(), fetchall(), fetchmany(), fetchone(), first(), keys(), one(), one_or_none(), partitions(), unique(), yield_per()
类签名
类sqlalchemy.ext.asyncio.AsyncMappingResult
(sqlalchemy.engine._WithKeys
,sqlalchemy.ext.asyncio.AsyncCommon
)-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
async all()Sequence[RowMapping] ¶
返回列表中的所有行。
等效于AsyncResult.all(),
不同之处在于RowMapping
值,而不是Row
对象。
-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
async close()None ¶
继承自AsyncCommon
的AsyncCommon.close()
方法
关闭此结果。
-
Attributesqlalchemy.ext.asyncio.AsyncMappingResult.
closed 属性¶
继承自AsyncCommon
的AsyncCommon.closed
属性
代理基础 result 对象的 .closed 属性(如果有),否则会引发AttributeError
。
2.0.0b3 版本的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
columns(*col_expressions: _KeyIndexType)Self ¶
建立应在每行中返回的列。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchall()Sequence[RowMapping] ¶ AsyncMappingResult.all()
方法的同义词。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchmany(size:intNone=None)Sequence[RowMapping] ¶
获取许多行。
等效于AsyncResult.fetchmany(),
不同之处在于RowMapping
值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchone()→RowMappingNone¶
获取一个对象。
等效于AsyncResult.fetchone(),
不同之处在于RowMapping
值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
async first()→RowMappingNone¶
获取第一个对象,如果不存在对象,则为None
。
等效于AsyncResult.first(),
不同之处在于RowMapping
值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
keys()RMKeyView ¶
继承自sqlalchemy.engine._WithKeys.keys
sqlalchemy.engine._WithKeys
的方法
返回一个可迭代视图,该视图产生将由每个Row
表示的字符串键。
键可以表示 core 语句返回的列的标签或 orm 执行返回的 orm 类的名称。
还可以使用 Python 测试视图的密钥包含in
运算符,该运算符将测试视图中表示的字符串键以及备用键(如 Column 对象)。
在 1.4 版本发生变更: 返回一个键视图对象,而不是一个普通的列表。
-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
async one()RowMapping ¶
只返回一个对象或引发异常。
等效于AsyncResult.one(),
不同之处在于RowMapping
值,而不是Row
对象。
-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
async one_or_none()→RowMappingNone¶
最多返回一个对象或引发异常。
等效于AsyncResult.one_or_none()
不同之处在于RowMapping
值,而不是Row
对象。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
async partitions(size:intNone=None)AsyncIterator[Sequence[RowMapping]] ¶
遍历给定大小的元素的子列表。
等效于AsyncResult.partitions(),
不同之处在于RowMapping
值,而不是Row
对象。
-
methodsqlalchemy.ext.asyncio.AsyncMappingResult.
unique(strategy:_UniqueFilterTypeNone=None)Self( 自身 ) ¶
将唯一筛选应用于此返回的对象AsyncMappingResult 的 AsyncMappingResult
中。
有关使用详情,请参阅AsyncResult.unique()。
-
方法sqlalchemy.ext.asyncio.AsyncMappingResult.
yield_per(num: int)Self ¶
继承自FilterResult
的FilterResult.yield_per()
方法
配置行获取策略以一次获取num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的传递。有关使用说明,请参阅该方法的文档。
1.4.40 版本中的新功能: - 添加了FilterResult.yield_per()
,以便该方法可用于所有结果集实现
-
-
类 sqlalchemy.ext.asyncio 中。AsyncTupleResult(异步元组结果)¶
一个类型化为返回纯 Python 元组而不是行的AsyncResult
。
由于Row
在各个方面都已经像一个元组,所以这个类是一个只打字的类,所以在运行时仍然使用常规的 AsyncResult
。
类签名
类sqlalchemy.ext.asyncio.AsyncTupleResult
(sqlalchemy.ext.asyncio.AsyncCommon
,sqlalchemy.util.langhelpers.TypingOnly
)
ORM Session API 文档¶
对象名称 |
描述 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-
函数 sqlalchemy.ext.asyncio 中。async_object_session(instance: object)→AsyncSessionNone¶
返回给定实例所属的AsyncSession
。
此函数使用 sync-API 函数object_session
检索 Session 时,该Session
引用给定的实例,并从那里将其链接到原始实例AsyncSession 的 AsyncSession
中。
如果AsyncSession
已被垃圾回收,则返回值为None
。
此功能也可从InstanceState.async_session
访问器。
参数
instance¶ —— 一个 ORM 映射的实例
结果
一个AsyncSession
对象,或者None
。
在 1.4.18 版本加入.
-
函数 sqlalchemy.ext.asyncio 中。async_session(session: Session)→AsyncSessionNone¶
返回代理给定
Session
对象(如果有)。
参数
session¶——一个 Session
实例。
结果
一个AsyncSession
实例,或者None
。
在 1.4.18 版本加入.
-
函数 async sqlalchemy.ext.asyncio 中。close_all_sessions()无 ¶
关闭所有AsyncSession
会话。
2.0.23 新版功能.
另请参阅close_all_sessions()
-
类 sqlalchemy.ext.asyncio 中。async_sessionmaker¶
可配置的AsyncSession
工厂。async_sessionmaker
Factory 的工作方式与sessionmaker
工厂,用于生成新的AsyncSession
对象时,创建它们 Given 在此处建立的配置参数。
例如:from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import async_sessionmaker async def run_some_sql( async_session: async_sessionmaker[AsyncSession], ) -> None: async with async_session() as session: session.add(SomeObject(data="object")) session.add(SomeOtherObject(name="other object")) await session.commit() async def main() -> None: # an AsyncEngine, which the AsyncSession will use for connection # resources engine = create_async_engine( "postgresql+asyncpg://scott:tiger@localhost/" ) # create a reusable factory for new AsyncSession instances async_session = async_sessionmaker(engine) await run_some_sql(async_session) await engine.dispose()
async_sessionmaker
非常有用,以便程序的不同部分可以使用预先建立的固定配置创建新的AsyncSession
对象。请注意,AsyncSession
对象也可以在不使用时直接实例化async_sessionmaker
。
2.0 新版功能:async_sessionmaker
提供了sessionmaker
类,该类专用于AsyncSession
对象,包括 pep-484 类型支持。
另请参阅
概要 - ORM - 显示示例使用sessionmaker
- 概述SessionMaker
架构
Opening and Closing a Session - 有关使用sessionmaker
创建会话的介绍性文本。
类签名-
方法sqlalchemy.ext.asyncio.async_sessionmaker.
__call__(**local_kw: Any)_AS ¶
使用此async_sessionmaker
中建立的配置生成新的AsyncSession
对象。
在 Python 中,当 __call__
方法以与函数相同的方式被 “调用” 时,会在对象上调用该方法:AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False) session = AsyncSession() # invokes sessionmaker.__call__()
-
methodsqlalchemy.ext.asyncio.async_sessionmaker.
__init__(bind: Optional[_AsyncSessionBind] = None, *, class_: type[_AS] = <class 'sqlalchemy.ext.asyncio.session.AsyncSession'>, autoflush: bool = True, expire_on_commit: bool = True, info: Optional[_InfoType] = None, **kw: Any)¶ -
这里的所有参数,除了 class_
都对应于Session
直接接受的参数。请参阅AsyncSession.__init__()
文档字符串以获取有关参数的更多详细信息。
-
方法sqlalchemy.ext.asyncio.async_sessionmaker.
begin()_AsyncSessionContextManager[_AS] ¶
生成一个上下文管理器,它既提供了一个新的AsyncSession
以及提交的事务。
例如:async def main(): Session = async_sessionmaker(some_engine) async with Session.begin() as session: session.add(some_object) # commits transaction, closes session
-
methodsqlalchemy.ext.asyncio.async_sessionmaker.
configure(**new_kw: Any)None (无)¶
(重新)配置此async_sessionmaker的参数。
例如:AsyncSession = async_sessionmaker(some_engine) AsyncSession.configure(bind=create_async_engine("sqlite+aiosqlite://"))
-
类 sqlalchemy.ext.asyncio 中。async_scoped_session¶
提供AsyncSession
对象的范围管理。
有关使用详情,请参阅使用 asyncio 范围的会话部分。
在 1.4.19 版本加入.
成员
__call__(), __init__(), aclose(), add(), add_all(), autoflush, begin(), begin_nested(), bind, close(), close_all(), commit(), configure(), connection(), delete()、 已删除、 脏、 执行()、 过期()、 expire_all()、 删除()、 expunge_all()、 刷新()、 获取()、 get_bind()、 get_one()、 identity_key()、 identity_map、 信息、 invalidate()、 is_active、 is_modified()、 merge()、 new、 no_autoflush、 object_session()、 refresh()、 remove()、 reset()、 rollback()、 scalar()、 scalars()、 session_factory、stream() 和 stream_scalars()
类签名-
方法sqlalchemy.ext.asyncio.async_scoped_session.
__call__(**kw: Any)_AS ¶
返回当前的AsyncSession
,使用scoped_session.session_factory
if 不存在来创建它。
参数
kw – 关键字参数将传递给scoped_session.session_factory
callable,如果现有的AsyncSession
不存在。 如果 存在AsyncSession
和 keyword 参数,InvalidRequestError
引发。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
__init__(session_factory: async_sessionmaker[_AS], scopefunc: Callable[[],Any])¶ -
参数
session_factory¶– 用于创建新AsyncSession
的工厂 实例。这通常(但不一定)是一个实例async_sessionmaker
。
scopefunc¶—— 定义当前范围的函数。函数,如asyncio.current_task
在这里可能有用。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async aclose()None ¶ AsyncSession.close()
的同义词。
代表async_scoped_session
类代理AsyncSession
类。AsyncSession.aclose()
名称专门用于支持 Python 标准库@contextlib.aclosing
Context Manager 函数。
2.0.20 版本的新Function。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
add(instance: object, _warn: bool = True)无 ¶
将对象放入此Session
中。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
在传递给Session.add()
方法将移动到 pending 状态,直到下一次 flush,此时它们将进入 persistent 状态。
在传递给Session.add()
方法将移动到持久化 state 直接。
如果Session
使用的事务回滚, 对象,这些对象在传递给Session.add()
将移回 transient 状态,并且将不再存在于此会话
。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
add_all(instances: Iterable[object])None ¶
将给定的实例集合添加到此Session
。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
有关一般行为描述,请参阅Session.add()
的文档。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
自动刷新¶
代表AsyncSession
类的Session.autoflush
属性的代理。
代表async_scoped_session
类代理AsyncSession
类。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
begin()AsyncSessionTransaction ¶
返回AsyncSessionTransaction
对象。
代表async_scoped_session
类代理AsyncSession
类。
底层Session
将在AsyncSessionTransaction
object 的输入值:async with async_session.begin(): ... # ORM transaction is begun
请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应SessionEvents.after_transaction_create()
事件钩子。
有关 ORM begin 的一般描述,请参阅Session.begin() 的 Session.begin()
中。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
begin_nested()AsyncSessionTransaction ¶
返回一个AsyncSessionTransaction
对象,该对象将开始一个 “嵌套” 事务,例如 SAVEPOINT。
代表async_scoped_session
类代理AsyncSession
类。
行为与AsyncSession.begin()
相同。
有关 ORM begin 嵌套的一般描述,请参阅Session.begin_nested()
中。
另请参阅
可序列化隔离/保存点/事务性 DDL(asyncio 版本)- SQLite asyncio 驱动程序需要特殊解决方法,以便 SAVEPOINT 正常工作。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
绑定¶
代表async_scoped_session
类的AsyncSession.bind
属性的代理。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async close()None ¶
关闭此使用的事务资源和 ORM 对象AsyncSession 的 AsyncSession
中。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.close()
- “close” 的主要文档
结束语 - 有关语义的详细信息AsyncSession.close()
和AsyncSession.reset() 的 AsyncSession.reset()
中。
-
async classmethodsqlalchemy.ext.asyncio.async_scoped_session.
close_all()None (无)¶
关闭所有AsyncSession
会话。
代表async_scoped_session
类代理AsyncSession
类。
2.0 版后已移除:AsyncSession.close_all()
方法已弃用,并将在未来发行版中删除。请参阅close_all_sessions()。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async commit()None ¶
提交当前正在进行的事务。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.commit()
- “提交” 的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
configure(**kwargs: Any)None ¶
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async connection(bind_arguments:_BindArgumentsNone=None, execution_options:CoreExecuteOptionsParameterNone=None, **kw: Any)AsyncConnection ¶
返回与此Session
对象的事务状态对应的AsyncConnection
对象。
代表async_scoped_session
类代理AsyncSession
类。
此方法还可用于为当前事务使用的数据库连接建立执行选项。
1.4.24 版本中的新功能: 添加了传递给底层Session.connection()
方法的 **kw 参数。
另请参阅Session.connection()
- “连接” 的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async delete(instance: object)无 ¶
将实例标记为已删除。
代表async_scoped_session
类代理AsyncSession
类。
数据库删除作发生在flush() 上
。
由于此作可能需要沿未加载的关系级联,因此可以等待允许这些查询发生。
另请参阅Session.delete()
- 删除的主要文档
-
已删除属性sqlalchemy.ext.asyncio.async_scoped_session.
¶
此Session
中标记为 'deleted' 的所有实例的集合
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
-
attribute dirty(脏sqlalchemy.ext.asyncio.async_scoped_session.
属性)¶
被视为 dirty 的所有持久性实例的集合。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
例如:some_mapped_object in session.dirty
当实例被修改但未被删除时,它们被视为脏实例。
请注意,这种 “肮脏 ”的计算是 “乐观的”;大多数属性设置或集合修改作会将实例标记为 'dirty' 并将其放置在此集中,即使属性的值没有净变化。在 flush 时,将每个属性的值与其之前保存的值进行比较,如果没有净变化,则不会发生 SQL作(这是一个成本更高的作,因此仅在 flush 时完成)。
要检查实例是否对其属性进行了可作的净更改,请使用Session.is_modified()
方法。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async execute(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: any)result[any] ¶
执行一条语句并返回一个缓冲的Result
对象。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.execute()
- 执行的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
expire(instance: object, attribute_names:Iterable[str]None=None)None(无 )¶
使实例上的属性过期。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
将实例的属性标记为过期。当过期的 属性,则会向Session
对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。
要同时使Session
中的所有对象过期,请使用Session.expire_all()。
Session
对象的默认行为是每当Session.rollback()
或Session.commit()
方法,以便可以为新事务加载新状态。因此,调用Session.expire()
仅对当前事务中发出非 ORM SQL 语句的特定情况有意义。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
expire_all()无 ¶
使此 Session 中的所有持久性实例过期。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
下次访问持久化实例上的任何属性时, 将使用Session
对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。
要使单个对象和这些对象上的各个属性过期,请使用Session.expire()。
Session
对象的默认行为是每当Session.rollback()
或Session.commit()
方法,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用Session.expire_all()
。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
expunge(instance: object)None(无 )¶
从此Session
中删除实例。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
这将释放对实例的所有内部引用。将根据 Expunge Cascade 规则应用 Cascading。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
expunge_all()无 ¶
从此Session
中删除所有对象实例。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
这相当于对 this 中的所有对象调用expunge(obj)
会话
。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async flush(objects:Sequence[Any]None=None)None ¶
将所有对象更改刷新到数据库。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.flush()
- flush 的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options:Sequence[ORMOption]None=None、populate_existing:bool = False、with_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options: OrmExecuteOptionsParameter = {})→_ONone¶
根据给定的主键标识符返回实例,如果未找到,则返回None
。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.get()
- get 的主要文档
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
get_bind(mapper:_EntityBindKey[_O]None=None, clause:ClauseElementNone=None, bind:_SessionBindNone=None, **kw: Any)→EngineConnection¶
返回一个 “bind”,同步代理的 Session
已绑定。
代表async_scoped_session
类代理AsyncSession
类。
与Session.get_bind()
方法不同,此AsyncSession
当前未以任何方式使用此方法来解析请求的引擎。
注意
此方法直接代理Session.get_bind()
方法,但是与Session.get_bind()
方法相比,它目前不能用作覆盖目标。 以下示例说明了如何实现自定义Session.get_bind()
方案,适用于AsyncSession
和AsyncEngine 的 AsyncEngine
进行验证。
自定义垂直分区中引入的模式 说明了如何将自定义绑定查找方案应用于Session
给定一组Engine
对象。应用相应的Session.get_bind()
实现以用于AsyncSession
和AsyncEngine
对象中,继续子类化 Session
并将其应用于AsyncSession
使用AsyncSession.sync_session_class
。内部方法必须继续返回Engine
实例,这些实例可以使用AsyncEngine.sync_engine
属性:# using example from "Custom Vertical Partitioning" import random from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import Session # construct async engines w/ async drivers engines = { "leader": create_async_engine("sqlite+aiosqlite:///leader.db"), "other": create_async_engine("sqlite+aiosqlite:///other.db"), "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"), "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"), } class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): # within get_bind(), return sync engines if mapper and issubclass(mapper.class_, MyOtherClass): return engines["other"].sync_engine elif self._flushing or isinstance(clause, (Update, Delete)): return engines["leader"].sync_engine else: return engines[ random.choice(["follower1", "follower2"]) ].sync_engine # apply to AsyncSession using sync_session_class AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)
Session.get_bind()
方法在非 asyncio、隐式非阻塞上下文中调用,其方式与通过AsyncSession.run_sync()
调用的 ORM 事件钩子和函数相同,因此 希望在Session.get_bind()
可以继续使用阻塞式代码来执行此作,该代码将在数据库驱动程序上调用 IO 时转换为隐式异步调用。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options:Sequence[ORMOption]None=无、populate_existing:bool = False、with_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options: OrmExecuteOptionsParameter = {})_O ¶
根据给定的主键标识符返回实例,如果未找到则引发异常。
代表async_scoped_session
类代理AsyncSession
类。
sqlalchemy.orm.exc.NoResultFound
如果查询未选择任何行,则引发。
..版本已添加: 2.0.22
另请参阅Session.get_one()
- get_one 的主要文档
-
类方法sqlalchemy.ext.asyncio.async_scoped_session.
identity_key(class_:Type[Any]None=None, ident:AnyTuple[Any,...]=None, *, instance:AnyNone=None, row:Row[Any]RowMappingNone=None, identity_token:AnyNone=None)_IdentityKeyType[Any] ¶
返回身份密钥。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
这是identity_key()
的别名。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
identity_map¶
代表AsyncSession
类的Session.identity_map
属性的代理。
代表async_scoped_session
类代理AsyncSession
类。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
信息¶
用户可修改的字典。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
此字典的初始值可以使用info
参数添加到Session
构造函数中,或者sessionmaker
构造函数或工厂方法。这里的字典始终是这个Session
的本地字典,并且可以独立于所有其他Session
对象进行修改。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async invalidate()None ¶
使用连接失效关闭此 Session。
代表async_scoped_session
类代理AsyncSession
类。
有关完整描述,请参阅Session.invalidate()。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
is_active¶
如果此Session
未处于 “partial rollback” 状态,则为 True。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
“partial rollback” 状态通常表示Session
的 flush 过程失败,并且Session.rollback()
方法才能完全回滚事务。
如果此Session
根本不在事务中,则Session
将在首次使用时自动启动,因此在这种情况下,Session.is_active
将返回 True。
否则,如果此Session
在事务中, 并且该事务尚未在内部回滚,则Session.is_active
也将返回 True。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
is_modified(instance: object, include_collections: bool = True)bool ¶
如果给定实例具有本地修改的属性,则返回True
。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
此方法检索实例上每个分析属性的历史记录,并执行当前值与其先前刷新或提交的值(如果有)的比较。
它实际上是更昂贵和准确的 版本。Session.dirty
集合;对每个属性的 Net “dirty” 状态执行完整测试。
例如:return session.is_modified(someobject)
此方法需要注意以下几点:
使用此方法测试时,Session.dirty
集合中存在的实例可能会报告False
。这是因为该对象可能已经通过属性 mutation 接收了更改事件,因此将其放置在Session.dirty
中,但最终状态与从数据库加载的状态相同,因此这里没有净变化。
如果在接收新值时未加载或过期,则在应用新值时,标量属性可能未记录先前设置的值 - 在这些情况下,假定该属性发生了变化,即使最终没有针对其数据库值的净变化。在大多数情况下,SQLAlchemy 在发生 set 事件时不需要“旧”值,因此如果旧值不存在,它会跳过 SQL 调用的费用,基于通常需要标量值的 UPDATE 的假设,并且在不需要的少数情况下,平均比发出防御性 SELECT 便宜。
仅当属性容器的active_history
标志设置为True
时,才会在设置时无条件地获取 “old” 值。此标志通常用于主键属性和标量对象引用,它们不是简单的多对一。要为任何任意映射列设置此标志,请使用active_history
参数替换为column_property()
的
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async merge(instance: _O, *, load: bool = True, options:Sequence[ORMOption]None=None)_O ¶
将给定实例的状态复制到此AsyncSession
中的相应实例中。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.merge()
- 合并的主要文档
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
new¶
此Session
中标记为 'new' 的所有实例的集合。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
no_autoflush¶
返回禁用 autoflush 的上下文管理器。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
例如:with session.no_autoflush: some_object = SomeClass() session.add(some_object) # won't autoflush some_object.related_thing = session.query(SomeRelated).first()
在with:
块中继续的作不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时非常有用,其中未完成的对象不应被刷新。
-
类方法sqlalchemy.ext.asyncio.async_scoped_session.
object_session(instance: object)→SessionNone¶
返回对象所属的Session
。
代表async_scoped_session
类代理AsyncSession
类。
代表AsyncSession
类代理Session
类。
这是object_session()
的别名。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async refresh(instance: object, attribute_names:Iterable[str]None=None, with_for_update: ForUpdateParameter = None)无 ¶
过期并刷新给定实例上的属性。
代表async_scoped_session
类代理AsyncSession
类。
将向数据库发出查询,并且所有属性都将使用其当前数据库值刷新。
这是Session.refresh()
方法的异步版本。有关所有选项的完整说明,请参阅该方法。
另请参阅Session.refresh()
- 刷新的主要文档
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async remove()None ¶
释放当前AsyncSession
(如果存在)。
与 scoped_session 的 remove 方法不同,该方法会使用 await 来等待 AsyncSession 的 close 方法。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async reset(None ¶
关闭此使用的事务资源和 ORM 对象Session
,将会话重置为其初始状态。
代表async_scoped_session
类代理AsyncSession
类。
2.0.22 新版功能.
另请参阅Session.reset()
- “reset” 的主要文档
结束语 - 有关语义的详细信息AsyncSession.close()
和AsyncSession.reset() 的 AsyncSession.reset()
中。
-
方法sqlalchemy.ext.asyncio.async_scoped_session.
async rollback()None ¶
回滚当前正在进行的事务。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.rollback()
- “回滚” 的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async scalar(statement: 可执行文件, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)任意 ¶
执行语句并返回标量结果。
代表async_scoped_session
类代理AsyncSession
类。
另请参阅Session.scalar()
- 标量的主要文档
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async scalars(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)ScalarResult[任意] ¶
执行语句并返回标量结果。
代表async_scoped_session
类代理AsyncSession
类。
结果
1.4.24 版本中的新功能: 添加了AsyncSession.scalars()
1.4.26 版本中的新功能: 添加async_scoped_session.scalars()
-
属性sqlalchemy.ext.asyncio.async_scoped_session.
session_factory:async_sessionmaker[_AS]¶
提供给 __init__ 的session_factory存储在此属性中,以后可以访问。当需要新的非范围的AsyncSession
时,这可能很有用。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async stream(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)AsyncResult[任意] ¶
执行语句并返回流AsyncResult
对象。
代表async_scoped_session
类代理AsyncSession
类。
-
methodsqlalchemy.ext.asyncio.async_scoped_session.
async stream_scalars(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)AsyncScalarResult[任意] ¶
执行一个语句并返回一个标量结果流。
代表async_scoped_session
类代理AsyncSession
类。
结果
一个AsyncScalarResult
对象
在 1.4.24 版本加入.
-
-
类 sqlalchemy.ext.asyncio 中。异步属性¶
Mixin 类,该类为所有属性提供可等待的访问器。
例如:from __future__ import annotations from typing import List from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" id: Mapped[int] = mapped_column(primary_key=True) data: Mapped[str] bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" id: Mapped[int] = mapped_column(primary_key=True) a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) data: Mapped[str]
在上面的示例中,AsyncAttrs
mixin 应用于声明式Base
类,它对所有子类生效。 此 mixin 添加了一个新属性AsyncAttrs.awaitable_attrs
所有类,这将产生任何属性的值作为 awaitable。这允许访问可能受延迟加载或延迟/未过期加载约束的属性,以便仍然可以发出 IO:a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs`` # interface, so that it may be awaited for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs.awaitable_attrs
对 属性,这大致相当于使用AsyncSession.run_sync()
方法,例如:for b1 in await async_session.run_sync(lambda sess: a1.bs): print(b1)
2.0.13 新版功能.-
属性sqlalchemy.ext.asyncio.AsyncAttrs.
awaitable_attrs¶
提供此对象上包装为 Awaittables 的所有属性的命名空间。
例如:a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() some_attribute = await a1.awaitable_attrs.some_deferred_attribute some_collection = await a1.awaitable_attrs.some_collection
-
-
类 sqlalchemy.ext.asyncio 中。异步会话¶
Asyncio 版本的Session
的 Session。AsyncSession
是传统Session
实例。AsyncSession
在并发中使用是不安全的 任务..请参阅 Session 线程安全吗? 在并发任务中共享 AsyncSession 是否安全?作为背景。
在 1.4 版本加入.
将AsyncSession
与自定义Session
一起使用 implementations,请参阅AsyncSession.sync_session_class
参数。
成员
sync_session_class、__init__()、aclose()、add()、add_all()、autoflush、begin()、begin_nested()、close()、close_all()、commit()、connection()、delete()、deleted、dirty、execute()、expire()、expire_all()、expunge()、expunge_all()、flush()、get()、get_bind()、get_nested_transaction()、get_one()、get_transaction()、identity_key()identity_map、in_nested_transaction()、in_transaction()、info、invalidate()、is_active、is_modified()、merge()、new、no_autoflush、object_session()、refresh()、reset()、rollback()、 run_sync()、 标量()、标量 ()、 流()、 stream_scalars() sync_session
类签名
类sqlalchemy.ext.asyncio.AsyncSession
(sqlalchemy.ext.asyncio.base.ReversibleProxy
)-
属性sqlalchemy.ext.asyncio.AsyncSession.
sync_session_class: Type[Session] = <class 'sqlalchemy.orm.session.Session'>¶
为特定 Session 实例提供底层Session
实例的类或可调用对象AsyncSession 的 AsyncSession
中。
在类级别,此属性是AsyncSession.sync_session_class
参数。AsyncSession
的自定义子类可以覆盖此 URL。
在实例级别,此属性指示用于为此AsyncSession
实例提供Session
实例的当前类或可调用对象。
在 1.4.24 版本加入.
-
方法sqlalchemy.ext.asyncio.AsyncSession.
__init__(bind:_AsyncSessionBindNone=None, *, binds:Dict[_SessionBindKey,_AsyncSessionBind]None=None, sync_session_class:Type[Session]None=None, **kw: Any)¶
构造一个新的AsyncSession
。
除sync_session_class
之外的所有参数都将传递给sync_session_class
直接调用来实例化新的会话
。有关参数文档,请参阅Session.__init__()。
参数sync_session_class¶ –
一个Session
子类或其他可调用对象,将用于构造将被代理的Session
。此参数可用于提供自定义Session
子。默认为AsyncSession.sync_session_class
class-level 属性。
在 1.4.24 版本加入.
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async aclose()None ¶ AsyncSession.close()
的同义词。AsyncSession.aclose()
名称专门用于支持 Python 标准库@contextlib.aclosing
Context Manager 函数。
2.0.20 版本的新Function。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
add(instance: object, _warn: bool = True)无 ¶
将对象放入此Session
中。
代表AsyncSession
类代理Session
类。
在传递给Session.add()
方法将移动到 pending 状态,直到下一次 flush,此时它们将进入 persistent 状态。
在传递给Session.add()
方法将移动到持久化 state 直接。
如果Session
使用的事务回滚, 对象,这些对象在传递给Session.add()
将移回 transient 状态,并且将不再存在于此会话
。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
add_all(instances: Iterable[object])None ¶
将给定的实例集合添加到此Session
。
代表AsyncSession
类代理Session
类。
有关一般行为描述,请参阅Session.add()
的文档。
-
属性sqlalchemy.ext.asyncio.AsyncSession.
自动刷新¶
代表AsyncSession
类的Session.autoflush
属性的代理。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
begin()AsyncSessionTransaction ¶
返回AsyncSessionTransaction
对象。
底层Session
将在AsyncSessionTransaction
object 的输入值:async with async_session.begin(): ... # ORM transaction is begun
请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应SessionEvents.after_transaction_create()
事件钩子。
有关 ORM begin 的一般描述,请参阅Session.begin() 的 Session.begin()
中。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
begin_nested()AsyncSessionTransaction ¶
返回一个AsyncSessionTransaction
对象,该对象将开始一个 “嵌套” 事务,例如 SAVEPOINT。
行为与AsyncSession.begin()
相同。
有关 ORM begin 嵌套的一般描述,请参阅Session.begin_nested()
中。
另请参阅
可序列化隔离/保存点/事务性 DDL(asyncio 版本)- SQLite asyncio 驱动程序需要特殊解决方法,以便 SAVEPOINT 正常工作。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async close()None ¶
关闭此使用的事务资源和 ORM 对象AsyncSession 的 AsyncSession
中。
另请参阅Session.close()
- “close” 的主要文档
结束语 - 有关语义的详细信息AsyncSession.close()
和AsyncSession.reset() 的 AsyncSession.reset()
中。
-
async classmethodsqlalchemy.ext.asyncio.AsyncSession.
close_all()None (无)¶
关闭所有AsyncSession
会话。
2.0 版后已移除:AsyncSession.close_all()
方法已弃用,并将在未来发行版中删除。请参阅close_all_sessions()。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async commit()None ¶
提交当前正在进行的事务。
另请参阅Session.commit()
- “提交” 的主要文档
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async connection(bind_arguments:_BindArgumentsNone=None, execution_options:CoreExecuteOptionsParameterNone=None, **kw: Any)AsyncConnection ¶
返回与此Session
对象的事务状态对应的AsyncConnection
对象。
此方法还可用于为当前事务使用的数据库连接建立执行选项。
1.4.24 版本中的新功能: 添加了传递给底层Session.connection()
方法的 **kw 参数。
另请参阅Session.connection()
- “连接” 的主要文档
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async delete(instance: object)无 ¶
将实例标记为已删除。
数据库删除作发生在flush() 上
。
由于此作可能需要沿未加载的关系级联,因此可以等待允许这些查询发生。
另请参阅Session.delete()
- 删除的主要文档
-
已删除属性sqlalchemy.ext.asyncio.AsyncSession.
¶
此Session
中标记为 'deleted' 的所有实例的集合
代表AsyncSession
类代理Session
类。
-
attribute dirty(脏sqlalchemy.ext.asyncio.AsyncSession.
属性)¶
被视为 dirty 的所有持久性实例的集合。
代表AsyncSession
类代理Session
类。
例如:some_mapped_object in session.dirty
当实例被修改但未被删除时,它们被视为脏实例。
请注意,这种 “肮脏 ”的计算是 “乐观的”;大多数属性设置或集合修改作会将实例标记为 'dirty' 并将其放置在此集中,即使属性的值没有净变化。在 flush 时,将每个属性的值与其之前保存的值进行比较,如果没有净变化,则不会发生 SQL作(这是一个成本更高的作,因此仅在 flush 时完成)。
要检查实例是否对其属性进行了可作的净更改,请使用Session.is_modified()
方法。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async execute(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: any)result[any] ¶
执行一条语句并返回一个缓冲的Result
对象。
另请参阅Session.execute()
- 执行的主要文档
-
methodsqlalchemy.ext.asyncio.AsyncSession.
expire(instance: object, attribute_names:Iterable[str]None=None)None(无 )¶
使实例上的属性过期。
代表AsyncSession
类代理Session
类。
将实例的属性标记为过期。当过期的 属性,则会向Session
对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。
要同时使Session
中的所有对象过期,请使用Session.expire_all()。
Session
对象的默认行为是每当Session.rollback()
或Session.commit()
方法,以便可以为新事务加载新状态。因此,调用Session.expire()
仅对当前事务中发出非 ORM SQL 语句的特定情况有意义。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
expire_all()无 ¶
使此 Session 中的所有持久性实例过期。
代表AsyncSession
类代理Session
类。
下次访问持久化实例上的任何属性时, 将使用Session
对象的当前事务上下文,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与之前在同一事务中读取的值相同的值,而不管该事务之外的数据库状态如何变化。
要使单个对象和这些对象上的各个属性过期,请使用Session.expire()。
Session
对象的默认行为是每当Session.rollback()
或Session.commit()
方法,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用Session.expire_all()
。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
expunge(instance: object)None(无 )¶
从此Session
中删除实例。
代表AsyncSession
类代理Session
类。
这将释放对实例的所有内部引用。将根据 Expunge Cascade 规则应用 Cascading。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
expunge_all()无 ¶
从此Session
中删除所有对象实例。
代表AsyncSession
类代理Session
类。
这相当于对 this 中的所有对象调用expunge(obj)
会话
。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async flush(objects:Sequence[Any]None=None)None ¶
将所有对象更改刷新到数据库。
另请参阅Session.flush()
- flush 的主要文档
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options:Sequence[ORMOption]None=None、populate_existing:bool = False、with_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options: OrmExecuteOptionsParameter = {})→_ONone¶
根据给定的主键标识符返回实例,如果未找到,则返回None
。
另请参阅Session.get()
- get 的主要文档
-
方法sqlalchemy.ext.asyncio.AsyncSession.
get_bind(mapper:_EntityBindKey[_O]None=None, clause:ClauseElementNone=None, bind:_SessionBindNone=None, **kw: Any)→EngineConnection¶
返回一个 “bind”,同步代理的 Session
已绑定。
与Session.get_bind()
方法不同,此AsyncSession
当前未以任何方式使用此方法来解析请求的引擎。
注意
此方法直接代理Session.get_bind()
方法,但是与Session.get_bind()
方法相比,它目前不能用作覆盖目标。 以下示例说明了如何实现自定义Session.get_bind()
方案,适用于AsyncSession
和AsyncEngine 的 AsyncEngine
进行验证。
自定义垂直分区中引入的模式 说明了如何将自定义绑定查找方案应用于Session
给定一组Engine
对象。应用相应的Session.get_bind()
实现以用于AsyncSession
和AsyncEngine
对象中,继续子类化 Session
并将其应用于AsyncSession
使用AsyncSession.sync_session_class
。内部方法必须继续返回Engine
实例,这些实例可以使用AsyncEngine.sync_engine
属性:# using example from "Custom Vertical Partitioning" import random from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import Session # construct async engines w/ async drivers engines = { "leader": create_async_engine("sqlite+aiosqlite:///leader.db"), "other": create_async_engine("sqlite+aiosqlite:///other.db"), "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"), "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"), } class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): # within get_bind(), return sync engines if mapper and issubclass(mapper.class_, MyOtherClass): return engines["other"].sync_engine elif self._flushing or isinstance(clause, (Update, Delete)): return engines["leader"].sync_engine else: return engines[ random.choice(["follower1", "follower2"]) ].sync_engine # apply to AsyncSession using sync_session_class AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)
Session.get_bind()
方法在非 asyncio、隐式非阻塞上下文中调用,其方式与通过AsyncSession.run_sync()
调用的 ORM 事件钩子和函数相同,因此 希望在Session.get_bind()
可以继续使用阻塞式代码来执行此作,该代码将在数据库驱动程序上调用 IO 时转换为隐式异步调用。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
get_nested_transaction()→AsyncSessionTransactionNone¶
返回当前正在进行的嵌套事务(如果有)。
结果一个 AsyncSessionTransaction
对象,或者无
。
在 1.4.18 版本加入.
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options:Sequence[ORMOption]None=无、populate_existing:bool = False、with_for_update:ForUpdateParameter = None、identity_token:AnyNone=None、execution_options: OrmExecuteOptionsParameter = {})_O ¶
根据给定的主键标识符返回实例,如果未找到则引发异常。
sqlalchemy.orm.exc.NoResultFound
如果查询未选择任何行,则引发。
..版本已添加: 2.0.22
另请参阅Session.get_one()
- get_one 的主要文档
-
方法sqlalchemy.ext.asyncio.AsyncSession.
get_transaction()→AsyncSessionTransactionNone¶
返回当前正在进行的根事务(如果有)。
结果一个 AsyncSessionTransaction
对象,或者无
。
在 1.4.18 版本加入.
-
类方法sqlalchemy.ext.asyncio.AsyncSession.
identity_key(class_:Type[Any]None=None, ident:AnyTuple[Any,...]=None, *, instance:AnyNone=None, row:Row[Any]RowMappingNone=None, identity_token:AnyNone=None)_IdentityKeyType[Any] ¶
返回身份密钥。
代表AsyncSession
类代理Session
类。
这是identity_key()
的别名。
-
属性sqlalchemy.ext.asyncio.AsyncSession.
identity_map¶
代表AsyncSession
类的Session.identity_map
属性的代理。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
in_nested_transaction()bool ¶
如果此Session
已经开始一个嵌套事务,例如 SAVEPOINT,则返回 True。
代表AsyncSession
类代理Session
类。
在 1.4 版本加入.
-
方法sqlalchemy.ext.asyncio.AsyncSession.
in_transaction()bool ¶
如果此Session
已开始事务,则返回 True。
代表AsyncSession
类代理Session
类。
在 1.4 版本加入.
另请参阅
-
属性sqlalchemy.ext.asyncio.AsyncSession.
信息¶
用户可修改的字典。
代表AsyncSession
类代理Session
类。
此字典的初始值可以使用info
参数添加到Session
构造函数中,或者sessionmaker
构造函数或工厂方法。这里的字典始终是这个Session
的本地字典,并且可以独立于所有其他Session
对象进行修改。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async invalidate()None ¶
使用连接失效关闭此 Session。
有关完整描述,请参阅Session.invalidate()。
-
属性sqlalchemy.ext.asyncio.AsyncSession.
is_active¶
如果此Session
未处于 “partial rollback” 状态,则为 True。
代表AsyncSession
类代理Session
类。
“partial rollback” 状态通常表示Session
的 flush 过程失败,并且Session.rollback()
方法才能完全回滚事务。
如果此Session
根本不在事务中,则Session
将在首次使用时自动启动,因此在这种情况下,Session.is_active
将返回 True。
否则,如果此Session
在事务中, 并且该事务尚未在内部回滚,则Session.is_active
也将返回 True。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
is_modified(instance: object, include_collections: bool = True)bool ¶
如果给定实例具有本地修改的属性,则返回True
。
代表AsyncSession
类代理Session
类。
此方法检索实例上每个分析属性的历史记录,并执行当前值与其先前刷新或提交的值(如果有)的比较。
它实际上是更昂贵和准确的 版本。Session.dirty
集合;对每个属性的 Net “dirty” 状态执行完整测试。
例如:return session.is_modified(someobject)
此方法需要注意以下几点:
使用此方法测试时,Session.dirty
集合中存在的实例可能会报告False
。这是因为该对象可能已经通过属性 mutation 接收了更改事件,因此将其放置在Session.dirty
中,但最终状态与从数据库加载的状态相同,因此这里没有净变化。
如果在接收新值时未加载或过期,则在应用新值时,标量属性可能未记录先前设置的值 - 在这些情况下,假定该属性发生了变化,即使最终没有针对其数据库值的净变化。在大多数情况下,SQLAlchemy 在发生 set 事件时不需要“旧”值,因此如果旧值不存在,它会跳过 SQL 调用的费用,基于通常需要标量值的 UPDATE 的假设,并且在不需要的少数情况下,平均比发出防御性 SELECT 便宜。
仅当属性容器的active_history
标志设置为True
时,才会在设置时无条件地获取 “old” 值。此标志通常用于主键属性和标量对象引用,它们不是简单的多对一。要为任何任意映射列设置此标志,请使用active_history
参数替换为column_property()
的
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async merge(instance: _O, *, load: bool = True, options:Sequence[ORMOption]None=None)_O ¶
将给定实例的状态复制到此AsyncSession
中的相应实例中。
另请参阅Session.merge()
- 合并的主要文档
-
属性sqlalchemy.ext.asyncio.AsyncSession.
new¶
此Session
中标记为 'new' 的所有实例的集合。
代表AsyncSession
类代理Session
类。
-
属性sqlalchemy.ext.asyncio.AsyncSession.
no_autoflush¶
返回禁用 autoflush 的上下文管理器。
代表AsyncSession
类代理Session
类。
例如:with session.no_autoflush: some_object = SomeClass() session.add(some_object) # won't autoflush some_object.related_thing = session.query(SomeRelated).first()
在with:
块中继续的作不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时非常有用,其中未完成的对象不应被刷新。
-
类方法sqlalchemy.ext.asyncio.AsyncSession.
object_session(instance: object)→SessionNone¶
返回对象所属的Session
。
代表AsyncSession
类代理Session
类。
这是object_session()
的别名。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async refresh(instance: object, attribute_names:Iterable[str]None=None, with_for_update: ForUpdateParameter = None)无 ¶
过期并刷新给定实例上的属性。
将向数据库发出查询,并且所有属性都将使用其当前数据库值刷新。
这是Session.refresh()
方法的异步版本。有关所有选项的完整说明,请参阅该方法。
另请参阅Session.refresh()
- 刷新的主要文档
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async reset(None ¶
关闭此使用的事务资源和 ORM 对象Session
,将会话重置为其初始状态。
2.0.22 新版功能.
另请参阅Session.reset()
- “reset” 的主要文档
结束语 - 有关语义的详细信息AsyncSession.close()
和AsyncSession.reset() 的 AsyncSession.reset()
中。
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async rollback()None ¶
回滚当前正在进行的事务。
另请参阅Session.rollback()
- “回滚” 的主要文档
-
方法sqlalchemy.ext.asyncio.AsyncSession.
async run_sync(fn: ~typing.Callable[[~typing.连接[~sqlalchemy.orm.session.Session, ~_P]], ~sqlalchemy.ext.asyncio.session._T], *arg: ~typing.~_P, **kw: ~typing.~_P)_T ¶
调用给定的 synchronous (i. not async) 可调用对象,传递一个 synchronous 风格的Session
作为第一个参数。
此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。
例如:def some_business_method(session: Session, param: str) -> str: """A synchronous function that does not require awaiting :param session: a SQLAlchemy Session, used synchronously :return: an optional return value is supported """ session.add(MyObject(param=param)) session.flush() return "success" async def do_something_async(async_engine: AsyncEngine) -> None: """an async function that uses awaiting""" with AsyncSession(async_engine) as async_session: # run some_business_method() with a sync-style # Session, proxied into an awaitable return_code = await async_session.run_sync( some_business_method, param="param1" ) print(return_code)
此方法通过在专门检测的 greenlet 中运行给定的可调用对象来维护 asyncio 事件循环,直到数据库连接。
提示
提供的可调用对象在 asyncio 事件循环中内联调用,并将在传统的 IO 调用上阻塞。此可调用对象中的 IO 应仅调用 SQLAlchemy 的 asyncio 数据库 API,该 API 将适当适应 greenlet 上下文。
另请参阅AsyncAttrs
- ORM 映射类的 mixin,它在每个属性的基础上更简洁地提供了类似的功能
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async scalar(statement: 可执行文件, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)任意 ¶
执行语句并返回标量结果。
另请参阅Session.scalar()
- 标量的主要文档
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async scalars(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)ScalarResult[任意] ¶
执行语句并返回标量结果。
结果
1.4.24 版本中的新功能: 添加了AsyncSession.scalars()
1.4.26 版本中的新功能: 添加async_scoped_session.scalars()
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async stream(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)AsyncResult[任意] ¶
执行语句并返回流AsyncResult
对象。
-
methodsqlalchemy.ext.asyncio.AsyncSession.
async stream_scalars(statement: Executable, params:_CoreAnyExecuteParamsNone=None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments:_BindArgumentsNone=无, **kw: 任意)AsyncScalarResult[任意] ¶
执行一个语句并返回一个标量结果流。
结果
一个AsyncScalarResult
对象
在 1.4.24 版本加入.
-
属性sqlalchemy.ext.asyncio.AsyncSession.
sync_session: 会话¶
引用底层Session
thisAsyncSession
代理请求。
此实例可用作事件目标。
另请参阅
-
-
类 sqlalchemy.ext.asyncio 中。AsyncSessionTransaction¶
ORMSessionTransaction
对象的包装器。
提供此对象是为了可以返回AsyncSession.begin()
的事务持有对象。
该对象支持对AsyncSessionTransaction.commit()
和AsyncSessionTransaction.rollback()
,以及用作异步上下文管理器。
在 1.4 版本加入.
类签名
类sqlalchemy.ext.asyncio.AsyncSessionTransaction
(sqlalchemy.ext.asyncio.base.ReversibleProxy
,sqlalchemy.ext.asyncio.base.StartableContext
)-
方法sqlalchemy.ext.asyncio.AsyncSessionTransaction.
async commit()None ¶
提交此AsyncTransaction
。
-
方法sqlalchemy.ext.asyncio.AsyncSessionTransaction.
async rollback()None ¶
回滚此AsyncTransaction
。
-