ORM 示例¶
SQLAlchemy 发行版包括各种代码示例,这些示例说明了一组选定的模式,其中一些是典型的,一些不是那么典型。所有版本都是可运行的,并且可以在发行版的 /examples
目录中找到。所有产品的描述和源代码都可以在这里找到。
其他 SQLAlchemy 示例(一些用户贡献)可在 https://www.sqlalchemy.org/trac/wiki/UsageRecipes 的 wiki 上找到。
映射 recipes¶
邻接列表¶
使用邻接列表模型映射的字典的字典结构的示例。
例如:
node = TreeNode("rootnode")
node.append("node1")
node.append("node3")
session.add(node)
session.commit()
dump_tree(node)
文件列表:
关联¶
说明“association object”模式用法的示例,其中中间类中介以多对多模式关联的两个类之间的关系。
文件列表:
proxied_association.py - 与 basic_association 示例相同,添加 ofsqlalchemy.ext.associationproxy
的用法以使对OrderItem
的显式引用可选。
basic_association.py - 说明 “Order” 和 “Item” 对象集合之间的多对多关系,通过名为 “OrderItem” 的关联对象将购买价格与每个对象相关联
dict_of_sets_with_default.py - 一个高级关联代理示例,它说明了关联代理的嵌套以生成多级 Python 集合,在本例中是一个以字符串键和整数集作为值的字典,它隐藏了底层映射类。
Asyncio 集成¶
说明 SQLAlchemy 的 asyncio 引擎功能的示例。
文件列表:
async_orm.py - 说明对象在异步 ORM 中的使用sqlalchemy.ext.asyncio.AsyncSession
。
async_orm_writeonly.py - 说明在 asyncio 下使用只写关系来简化 ORM 集合的处理。
gather_orm_statements.py - 演示如何使用asyncio.gather()
并发运行多个语句 沿许多 asyncio 数据库连接,将 ORM 结果合并到一个AsyncSession 的 AsyncSession
中。
basic.py - 说明 asyncio 引擎/连接接口。
greenlet_orm.py - 说明了 sqlalchemy.ext.asyncio.AsyncSession 对象在异步 ORM 中的使用,包括可选的 run_sync() 方法。
有向图¶
有向图结构的持久性示例。该图存储为一组边,每条边都引用节点表中的“下部”和“上部”节点。下面说明了低邻域和上邻域的基本持久性和查询:
n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())
文件列表:
作为字典的动态关系¶
演示如何将类似字典的 Facade 放置在“动态”关系之上,以便字典作(假设简单的字符串键)可以对大型集合进行作,而无需一次加载整个集合。
文件列表:
泛型关联¶
阐释将多种类型的父对象与特定子对象关联的各种方法。
这些示例都使用声明式扩展和声明式 mixin。每个类在最后都提供了相同的用例 - 两个类,Customer
和 Supplier
,它们都是 HasAddresses
混合的子类,这确保了父类具有包含 Address
对象的 addresses
集合。
discriminator_on_association.py 和 generic_fk.py 脚本
是 2007 年博客文章中介绍的食谱的现代化版本
与 SQLAlchemy 的多态关联。
文件列表:
table_per_association.py - 说明一个混音,该混音通过为每个父类单独生成的关联表提供通用关联。关联的对象本身保存在所有父对象共享的单个表中。
table_per_related.py - 说明一种泛型关联,该关联将关联对象保留在各个表中,每个关联对象都是为了代表特定父类保留这些对象而生成的。
discriminator_on_association.py - 说明了一个 mixin,它使用单个目标表和单个关联表(由所有父表引用)提供通用关联。关联表包含一个 “discriminator” 列,该列确定与关联表中每个特定行关联的父对象类型。
generic_fk.py - 以类似于 Django、ROR 等流行框架的方式说明所谓的“通用外键”。这种方法绕过了标准的引用完整性实践,因为“外键”列实际上并不被限制引用任何特定的表;相反,使用 in-application logic 来确定引用哪个 table。
物化路径¶
阐释使用 SQLAlchemy ORM 的分层数据的“物化路径”模式。
文件列表:
materialized_paths.py - 说明“具体化路径”模式。
嵌套集¶
说明了使用 SQLAlchemy ORM 为分层数据实现 “嵌套集” 模式的基本方法。
文件列表:
nested_sets.py - Celko 的 “嵌套集” 树结构。
性能¶
适用于各种 SQLAlchemy 使用案例的性能分析套件。
每个套件都侧重于具有特定性能配置文件和相关影响的特定用例:
批量插入
单个插入,带或不带事务
获取大量行
运行大量短查询
所有套件都包含各种使用模式,说明了 Core 和 ORM 的使用情况,并且通常按照性能从最差到最大的顺序排序,根据 SQLAlchemy 提供的功能量从最大到最小(这两件事通常完美对应)。
命令行工具在包级别提供,它允许运行各个套件:
$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
[--num NUM] [--profile] [--dump]
[--echo]
{bulk_inserts,large_resultsets,single_inserts}
positional arguments:
{bulk_inserts,large_resultsets,single_inserts}
suite to run
optional arguments:
-h, --help show this help message and exit
--test TEST run specific test name
--dburl DBURL database URL, default sqlite:///profile.db
--num NUM Number of iterations/items/etc for tests;
default is module-specific
--profile run profiling and dump call counts
--dump dump full call profile (implies --profile)
--echo Echo SQL output
运行示例如下所示:
$ python -m examples.performance bulk_inserts
或者有选项:
$ python -m examples.performance bulk_inserts \
--dburl mysql+mysqldb://scott:tiger@localhost/test \
--profile --num 1000
文件列表¶
文件列表:
bulk_updates.py - 这一系列测试将说明批量更新大量行的不同方法(正在构建中!目前只有一个测试)
large_resultsets.py - 在这一系列测试中,我们正在寻找加载大量非常小且简单的行的时间。
bulk_inserts.py - 这一系列测试说明了批量 INSERT 大量行的不同方法。
short_selects.py - 这一系列测试说明了按主键 SELECT 单个记录的不同方法
single_inserts.py - 在这一系列测试中,我们将研究一种方法,该方法在不同的事务中插入一行,然后返回到实质上“已关闭”状态。这类似于启动数据库连接、插入行、提交和关闭的 API 调用。
__main__.py - 允许将 examples/performance 包作为脚本运行。
带时间运行所有测试¶
这是 run 的默认形式:
$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
test_bulk_insert_dictionaries, test_core,
test_core_query_caching, test_dbapi_raw_w_connect,
test_dbapi_raw_w_pool
test_orm_commit : Individual INSERT/COMMIT pairs via the
ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
the "bulk" API (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
the "bulk" API with dictionaries (10000 iterations);
total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
(10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
connection pool (10000 iterations); total time 8.001813 sec
转储单个测试的配置文件¶
Python 配置文件输出可以转储所有测试,或者更常见的是单个测试:
$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
186109 function calls (186102 primitive calls) in 1.089 seconds
Ordered by: internal time, call count
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.634 0.001 0.634 0.001 {method 'commit' of 'sqlite3.Connection' objects}
1000 0.154 0.000 0.154 0.000 {method 'execute' of 'sqlite3.Cursor' objects}
1000 0.021 0.000 0.074 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
1000 0.015 0.000 0.034 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
1 0.012 0.012 1.091 1.091 examples/performance/single_inserts.py:79(test_core)
...
编写你自己的套件¶
Profiler Suite 系统是可扩展的,可以应用于您自己的测试集。在为某些性能关键型例程集确定正确的方法时,这是一种有价值的技术。例如,如果我们想分析几种加载之间的差异,我们可以创建一个文件test_loads.py
,其中包含以下内容:
from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = None
session = None
class Parent(Base):
__tablename__ = "parent"
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = "child"
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey("parent.id"))
# Init with name of file, default number of items
Profiler.init("test_loads", 1000)
@Profiler.setup_once
def setup_once(dburl, echo, num):
"setup once. create an engine, insert fixture data"
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
sess = Session(engine)
sess.add_all(
[
Parent(children=[Child() for j in range(100)])
for i in range(num)
]
)
sess.commit()
@Profiler.setup
def setup(dburl, echo, num):
"setup per test. create a new Session."
global session
session = Session(engine)
# pre-connect so this part isn't profiled (if we choose)
session.connection()
@Profiler.profile
def test_lazyload(n):
"load everything, no eager loading."
for parent in session.query(Parent):
parent.children
@Profiler.profile
def test_joinedload(n):
"load everything, joined eager loading."
for parent in session.query(Parent).options(joinedload("children")):
parent.children
@Profiler.profile
def test_subqueryload(n):
"load everything, subquery eager loading."
for parent in session.query(Parent).options(subqueryload("children")):
parent.children
if __name__ == "__main__":
Profiler.main()
我们可以直接运行我们的新脚本:
$ python test_loads.py --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec
太空侵略者¶
使用 SQLite 作为状态机的 Space Invaders 游戏。
最初开发于 2012 年。适合在 Python 3 中工作。
在使用 ASCII 艺术的文本控制台中运行。

要运行:
$ python -m examples.space_invaders.space_invaders
在运行时,观察日志中的 SQL 输出:
$ tail -f space_invaders.log
享受!
文件列表:
对对象进行版本控制¶
使用历史表进行版本控制¶
说明了一个扩展,该扩展为实体创建版本表并存储每个更改的记录。给定的扩展会生成一个匿名的 “history” 类,该类表示目标对象的历史版本。
与使用临时行进行版本控制示例相比,该示例将更新作为新行写入同一表中,而无需使用单独的历史记录表。
用法通过单元测试模块 test_versioning.py
进行说明,该模块使用 SQLAlchemy 的内部 pytest 插件运行:
$ pytest test/base/test_examples.py
使用声明性的示例用法片段:
from history_meta import Versioned, versioned_session
class Base(DeclarativeBase):
pass
class SomeClass(Versioned, Base):
__tablename__ = "sometable"
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
Session = sessionmaker(bind=engine)
versioned_session(Session)
sess = Session()
sc = SomeClass(name="sc1")
sess.add(sc)
sess.commit()
sc.name = "sc1modified"
sess.commit()
assert sc.version == 2
SomeClassHistory = SomeClass.__history_mapper__.class_
assert sess.query(SomeClassHistory).filter(
SomeClassHistory.version == 1
).all() == [SomeClassHistory(version=1, name="sc1")]
Versioned mixin 旨在与 Clarative
一起使用。要将扩展与经典映射器一起使用,可以应用 _history_mapper
函数:
from history_meta import _history_mapper
m = mapper(SomeClass, sometable)
_history_mapper(m)
SomeHistoryClass = SomeClass.__history_mapper__.class_
版本控制示例还与配置版本计数器中记录的 ORM 乐观并发功能集成。要启用此功能,请将标志 Versioned.use_mapper_versioning
设置为 True:
class SomeClass(Versioned, Base):
__tablename__ = "sometable"
use_mapper_versioning = True
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
在上面,如果更新具有相同版本标识符的两个 SomeClass
实例并发发送到数据库进行 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句继续进行,则一个语句将失败,因为它不再针对最后一个已知版本标识符。
文件列表:
test_versioning.py - 说明history_meta.py
用法的单元测试 module 函数。
history_meta.py - 版本控制的 mixin 类和其他实用程序。
使用 Temporal Rows 进行版本控制¶
几个示例说明了拦截更改的技术,这些更改首先被解释为行上的 UPDATE,而是将其转换为新行的 INSERT,而前一行作为历史版本保持不变。
与使用历史记录表进行版本控制示例相比,该示例将历史记录行写入单独的历史记录表。
文件列表:
versioned_rows.py - 说明了一种截获对象更改的方法,该方法将单行上的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。
versioned_rows_w_versionid.py - 说明了一种截获对象更改的方法,该方法将单行上的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。
versioned_map.py versioned_rows- 围绕 “vertical table” 结构的概念,如 垂直属性映射示例。
versioned_update_old_row.py - 说明了与versioned_rows.py
相同的 UPDATE 转换为 INSERT 技术,但也在旧行上发出 UPDATE 以影响时间戳的更改。还包括一个SessionEvents.do_orm_execute()
钩子,用于将查询限制为仅最新版本。
垂直属性映射¶
说明 “vertical table” 映射。
“垂直表”是指将对象的单个属性作为不同的行存储在表中的技术。“垂直表”技术用于持久保存对象,这些对象可以具有不同的属性集,但代价是简单的查询控制和简洁性。它常见于内容/文档管理系统中,以便灵活地表示用户创建的结构。
给出了该方法的两种变体。在第二个字段中,每行引用一个“数据类型”,其中包含有关属性中存储的信息类型的信息,例如整数、字符串或日期。
例:
shrew = Animal("shrew")
shrew["cuteness"] = 5
shrew["weasel-like"] = False
shrew["poisonous"] = True
session.add(shrew)
session.flush()
q = session.query(Animal).filter(
Animal.facts.any(
and_(AnimalFact.key == "weasel-like", AnimalFact.value == True)
)
)
print("weasel-like animals", q.all())
文件列表:
dictlike-polymorphic.py - 将多态值垂直表映射为字典。
dictlike.py - 将垂直表映射为字典。
继承映射配方¶
基本继承映射¶
单表、联接表和具体表继承的工作示例,如映射类继承层次结构中所述。
文件列表:
joined.py - 联接表(每个子类一个表)继承示例。
concrete.py - 具体表(每个类一个表)继承示例。
single.py - 单表(每个层次结构一个表)继承示例。
特殊 API¶
属性插桩¶
说明对 SQLAlchemy 的属性管理系统的修改的示例。
文件列表:
listen_for_events.py - 演示如何将事件附加到所有检测的属性并侦听更改事件。
active_column_defaults.py - 说明AttributeEvents.init_scalar()
的用法 event 与 Core 列默认提供 自动生成默认值的 ORM 对象 当访问 un-set 属性时。
custom_management.py - 说明使用sqlalchemy.ext.instrumentation
扩展包的自定义类插桩。
水平分片¶
使用 SQLAlchemy Sharding API 的基本示例。分片是指跨多个数据库水平扩展数据。
“分片”映射的基本组件是:
多个Engine
实例,每个实例都分配了一个“分片 ID”。这些Engine
实例可能引用不同的数据库,或同一数据库中的不同架构/帐户,或者它们甚至可以仅通过选项进行区分,这些选项将导致它们在使用时访问不同的架构或表。
一个函数,该函数可以返回单个分片 ID,给定要保存的实例;这称为 “shard_chooser”
一个函数,可以返回适用于特定实例标识符的分片 ID 列表;这称为 “id_chooser”。如果返回所有分片 ID,则搜索所有分片。
一个函数,该函数可以在给定特定 Query (“query_chooser”) 的情况下返回要尝试的分片 ID 列表。如果它返回所有分片 ID,则将查询所有分片并将结果联接在一起。
在这些示例中,针对同一基本示例使用了不同类型的分片,该示例基于每个大洲容纳天气数据。我们提供了示例 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser说明了对 SQL 表达式元素的检查,以便尝试确定所请求的单个分片。
构建通用分片例程是一种雄心勃勃的方法
到在多个数据库之间组织实例的问题。 对于
更直白的替代方案,“独特实体”方法
是将对象分配给不同表(并且可能
数据库节点)- 在 wiki 上描述
EntityName 的 EntityName 中。
文件列表:
separate_databases.py - 说明使用不同的 SQLite 数据库进行分片。
separate_tables.py - 说明使用单个 SQLite 数据库进行分片,但该数据库将具有使用命名约定的多个表。
separate_schema_translates.py - 说明了使用具有多个架构的单个数据库进行分片,其中每个分片可以使用不同的“schema_translates_map”。
asyncio.py - 说明与 asyncio 一起使用的分片 API。
扩展 ORM¶
ORM 查询事件¶
说明 ORM SELECT 行为增强的配方,如
Session.execute()
与 2.0 样式一起使用
select()
以及 1.x 样式的 Query
对象。
示例包括 with_loader_criteria()
的演示
选项以及 SessionEvents.do_orm_execute()
钩子。
从 SQLAlchemy 1.4 开始,Query
构造与 Select
构造统一,因此这两个对象几乎相同。
文件列表:
temporal_range.py - 说明将应用于选定实体的自定义每查询条件。
filter_public.py - 说明应用于特定类型实体的全局标准。
Dogpile 缓存¶
说明如何嵌入
狗桩.cache
功能,允许完全缓存控制
以及从长期缓存中提取 “Lazy loaded” 属性的能力。
在此演示中,说明了以下技术:
使用SessionEvents.do_orm_execute()
事件钩子
绕过Session.execute()
从自定义缓存源而不是数据库拉取的基本技术。
使用 dogpile.cache 进行基本缓存,使用 “regions” 允许对一组固定的配置进行全局控制。
使用自定义UserDefinedOption
对象在语句对象中配置选项。
另请参阅
Re-Executing Statements - 包括此处介绍的技术的一般示例。
例如:
# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))
# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))
# execute and results
result = session.execute(stmt)
print(result.scalars().all())
要运行,必须同时安装 SQLAlchemy 和 dogpile.cache 或在当前 PYTHONPATH 上。该演示将为数据文件创建一个本地目录,插入初始数据,然后运行。第二次运行演示将利用已经存在的缓存文件,并且将针对两个表发出一个 SQL 语句 - 但是,显示的结果将利用数十个从缓存中提取的延迟加载。
演示脚本本身(按复杂程度排序)作为 Python 模块运行,以便相对导入工作:
$ python -m examples.dogpile_caching.helloworld
$ python -m examples.dogpile_caching.relationship_caching
$ python -m examples.dogpile_caching.advanced
$ python -m examples.dogpile_caching.local_session_caching
文件列表:
environment.py - 建立数据/缓存文件路径和配置,必要时引导夹具数据。
caching_query.py - 表示允许将 Dogpile 缓存与 SQLAlchemy 一起使用的函数和类。引入了一个名为 FromCache 的查询选项。
model.py - 数据模型,表示具有多个 Address 对象的 Person,每个对象都有 PostalCode、City、Country。
fixture_data.py - 安装一些示例数据。在这里,我们有一些美国/加拿大城市的邮政编码。然后,安装 100 个 Person 记录,每个记录都有一个随机选择的邮政编码。
helloworld.py - 说明如何加载一些数据并缓存结果。
relationship_caching.py - 说明如何在关系端点上添加缓存选项,以便延迟加载从缓存加载。
advanced.py - 说明 Query 与 FromCache 选项结合使用的用法,包括前端加载、缓存失效和集合缓存。
local_session_caching.py - 此示例创建一个新的 dogpile.cache 后端,该后端将数据保存在当前会话的本地字典中。remove() 的会话,缓存就消失了。