ORM 示例


SQLAlchemy 发行版包括各种代码示例,这些示例说明了一组选定的模式,其中一些是典型的,一些不是那么典型。所有版本都是可运行的,并且可以在发行版的 /examples 目录中找到。所有产品的描述和源代码都可以在这里找到。


其他 SQLAlchemy 示例(一些用户贡献)可在 https://www.sqlalchemy.org/trac/wiki/UsageRecipes 的 wiki 上找到。


映射 recipes


邻接列表


使用邻接列表模型映射的字典的字典结构的示例。


例如:

node = TreeNode("rootnode")
node.append("node1")
node.append("node3")
session.add(node)
session.commit()

dump_tree(node)


文件列表:


关联


说明“association object”模式用法的示例,其中中间类中介以多对多模式关联的两个类之间的关系。


文件列表:


  • proxied_association.py - 与 basic_association 示例相同,添加 of sqlalchemy.ext.associationproxy 的用法以使对 OrderItem 的显式引用可选。


  • basic_association.py - 说明 “Order” 和 “Item” 对象集合之间的多对多关系,通过名为 “OrderItem” 的关联对象将购买价格与每个对象相关联


  • dict_of_sets_with_default.py - 一个高级关联代理示例,它说明了关联代理的嵌套以生成多级 Python 集合,在本例中是一个以字符串键和整数集作为值的字典,它隐藏了底层映射类。


Asyncio 集成


说明 SQLAlchemy 的 asyncio 引擎功能的示例。


文件列表:


  • async_orm.py - 说明对象在异步 ORM 中的使用 sqlalchemy.ext.asyncio.AsyncSession


  • async_orm_writeonly.py - 说明在 asyncio 下使用只写关系来简化 ORM 集合的处理。


  • gather_orm_statements.py - 演示如何使用 asyncio.gather() 并发运行多个语句 沿许多 asyncio 数据库连接,将 ORM 结果合并到一个 AsyncSession 的 AsyncSession 中。


  • basic.py - 说明 asyncio 引擎/连接接口。


  • greenlet_orm.py - 说明了 sqlalchemy.ext.asyncio.AsyncSession 对象在异步 ORM 中的使用,包括可选的 run_sync() 方法。


有向图


有向图结构的持久性示例。该图存储为一组边,每条边都引用节点表中的“下部”和“上部”节点。下面说明了低邻域和上邻域的基本持久性和查询:

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())


文件列表:


作为字典的动态关系


演示如何将类似字典的 Facade 放置在“动态”关系之上,以便字典作(假设简单的字符串键)可以对大型集合进行作,而无需一次加载整个集合。


文件列表:


泛型关联


阐释将多种类型的父对象与特定子对象关联的各种方法。


这些示例都使用声明式扩展和声明式 mixin。每个类在最后都提供了相同的用例 - 两个类,CustomerSupplier,它们都是 HasAddresses 混合的子类,这确保了父类具有包含 Address 对象的 addresses 集合。


discriminator_on_association.pygeneric_fk.py 脚本 是 2007 年博客文章中介绍的食谱的现代化版本 与 SQLAlchemy 的多态关联


文件列表:


  • table_per_association.py - 说明一个混音,该混音通过为每个父类单独生成的关联表提供通用关联。关联的对象本身保存在所有父对象共享的单个表中。


  • table_per_related.py - 说明一种泛型关联,该关联将关联对象保留在各个表中,每个关联对象都是为了代表特定父类保留这些对象而生成的。


  • discriminator_on_association.py - 说明了一个 mixin,它使用单个目标表和单个关联表(由所有父表引用)提供通用关联。关联表包含一个 “discriminator” 列,该列确定与关联表中每个特定行关联的父对象类型。


  • generic_fk.py - 以类似于 Django、ROR 等流行框架的方式说明所谓的“通用外键”。这种方法绕过了标准的引用完整性实践,因为“外键”列实际上并不被限制引用任何特定的表;相反,使用 in-application logic 来确定引用哪个 table。


物化路径


阐释使用 SQLAlchemy ORM 的分层数据的“物化路径”模式。


文件列表:


嵌套集


说明了使用 SQLAlchemy ORM 为分层数据实现 “嵌套集” 模式的基本方法。


文件列表:


性能


适用于各种 SQLAlchemy 使用案例的性能分析套件。


每个套件都侧重于具有特定性能配置文件和相关影响的特定用例:


  • 批量插入


  • 单个插入,带或不带事务


  • 获取大量行


  • 运行大量短查询


所有套件都包含各种使用模式,说明了 Core 和 ORM 的使用情况,并且通常按照性能从最差到最大的顺序排序,根据 SQLAlchemy 提供的功能量从最大到最小(这两件事通常完美对应)。


命令行工具在包级别提供,它允许运行各个套件:

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

positional arguments:
  {bulk_inserts,large_resultsets,single_inserts}
                        suite to run

optional arguments:
  -h, --help            show this help message and exit
  --test TEST           run specific test name
  --dburl DBURL         database URL, default sqlite:///profile.db
  --num NUM             Number of iterations/items/etc for tests;
                        default is module-specific
  --profile             run profiling and dump call counts
  --dump                dump full call profile (implies --profile)
  --echo                Echo SQL output


运行示例如下所示:

$ python -m examples.performance bulk_inserts


或者有选项:

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000


文件列表


文件列表:


  • bulk_updates.py - 这一系列测试将说明批量更新大量行的不同方法(正在构建中!目前只有一个测试)


  • large_resultsets.py - 在这一系列测试中,我们正在寻找加载大量非常小且简单的行的时间。


  • bulk_inserts.py - 这一系列测试说明了批量 INSERT 大量行的不同方法。


  • short_selects.py - 这一系列测试说明了按主键 SELECT 单个记录的不同方法


  • single_inserts.py - 在这一系列测试中,我们将研究一种方法,该方法在不同的事务中插入一行,然后返回到实质上“已关闭”状态。这类似于启动数据库连接、插入行、提交和关闭的 API 调用。


  • __main__.py - 允许将 examples/performance 包作为脚本运行。


带时间运行所有测试


这是 run 的默认形式:

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec


转储单个测试的配置文件


Python 配置文件输出可以转储所有测试,或者更常见的是单个测试:

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...


编写你自己的套件


Profiler Suite 系统是可扩展的,可以应用于您自己的测试集。在为某些性能关键型例程集确定正确的方法时,这是一种有价值的技术。例如,如果我们想分析几种加载之间的差异,我们可以创建一个文件test_loads.py,其中包含以下内容:

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parent.id"))


# Init with name of file, default number of items
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "setup once.  create an engine, insert fixture data"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all(
        [
            Parent(children=[Child() for j in range(100)])
            for i in range(num)
        ]
    )
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "setup per test.  create a new Session."
    global session
    session = Session(engine)
    # pre-connect so this part isn't profiled (if we choose)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "load everything, no eager loading."

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "load everything, joined eager loading."

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "load everything, subquery eager loading."

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children


if __name__ == "__main__":
    Profiler.main()


我们可以直接运行我们的新脚本:

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec


太空侵略者


使用 SQLite 作为状态机的 Space Invaders 游戏。


最初开发于 2012 年。适合在 Python 3 中工作。


在使用 ASCII 艺术的文本控制台中运行。

../_images/space_invaders.jpg


要运行:

$ python -m examples.space_invaders.space_invaders


在运行时,观察日志中的 SQL 输出:

$ tail -f space_invaders.log


享受!


文件列表:


对对象进行版本控制


使用历史表进行版本控制


说明了一个扩展,该扩展为实体创建版本表并存储每个更改的记录。给定的扩展会生成一个匿名的 “history” 类,该类表示目标对象的历史版本。


使用临时行进行版本控制示例相比,该示例将更新作为新行写入同一表中,而无需使用单独的历史记录表。


用法通过单元测试模块 test_versioning.py 进行说明,该模块使用 SQLAlchemy 的内部 pytest 插件运行:

$ pytest test/base/test_examples.py


使用声明性的示例用法片段:

from history_meta import Versioned, versioned_session


class Base(DeclarativeBase):
    pass


class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id


Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name="sc1")
sess.add(sc)
sess.commit()

sc.name = "sc1modified"
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).filter(
    SomeClassHistory.version == 1
).all() == [SomeClassHistory(version=1, name="sc1")]


Versioned mixin 旨在与 Clarative 一起使用。要将扩展与经典映射器一起使用,可以应用 _history_mapper 函数:

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_


版本控制示例还与配置版本计数器中记录的 ORM 乐观并发功能集成。要启用此功能,请将标志 Versioned.use_mapper_versioning 设置为 True:

class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id


在上面,如果更新具有相同版本标识符的两个 SomeClass 实例并发发送到数据库进行 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句继续进行,则一个语句将失败,因为它不再针对最后一个已知版本标识符。


文件列表:


使用 Temporal Rows 进行版本控制


几个示例说明了拦截更改的技术,这些更改首先被解释为行上的 UPDATE,而是将其转换为新行的 INSERT,而前一行作为历史版本保持不变。


使用历史记录表进行版本控制示例相比,该示例将历史记录行写入单独的历史记录表。


文件列表:


  • versioned_rows.py - 说明了一种截获对象更改的方法,该方法将单行上的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。


  • versioned_rows_w_versionid.py - 说明了一种截获对象更改的方法,该方法将单行上的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。


  • versioned_map.py versioned_rows- 围绕 “vertical table” 结构的概念,如 垂直属性映射示例。


  • versioned_update_old_row.py - 说明了与 versioned_rows.py相同的 UPDATE 转换为 INSERT 技术,但也在行上发出 UPDATE 以影响时间戳的更改。还包括一个 SessionEvents.do_orm_execute() 钩子,用于将查询限制为仅最新版本。


垂直属性映射


说明 “vertical table” 映射。


“垂直表”是指将对象的单个属性作为不同的行存储在表中的技术。“垂直表”技术用于持久保存对象,这些对象可以具有不同的属性集,但代价是简单的查询控制和简洁性。它常见于内容/文档管理系统中,以便灵活地表示用户创建的结构。


给出了该方法的两种变体。在第二个字段中,每行引用一个“数据类型”,其中包含有关属性中存储的信息类型的信息,例如整数、字符串或日期。


例:

shrew = Animal("shrew")
shrew["cuteness"] = 5
shrew["weasel-like"] = False
shrew["poisonous"] = True

session.add(shrew)
session.flush()

q = session.query(Animal).filter(
    Animal.facts.any(
        and_(AnimalFact.key == "weasel-like", AnimalFact.value == True)
    )
)
print("weasel-like animals", q.all())


文件列表:


继承映射配方


基本继承映射


单表、联接表和具体表继承的工作示例,如映射类继承层次结构中所述。


文件列表:


  • joined.py - 联接表(每个子类一个表)继承示例。


  • concrete.py - 具体表(每个类一个表)继承示例。


  • single.py - 单表(每个层次结构一个表)继承示例。


特殊 API


属性插桩


说明对 SQLAlchemy 的属性管理系统的修改的示例。


文件列表:


水平分片


使用 SQLAlchemy Sharding API 的基本示例。分片是指跨多个数据库水平扩展数据。


“分片”映射的基本组件是:


  • 多个 Engine 实例,每个实例都分配了一个“分片 ID”。这些 Engine 实例可能引用不同的数据库,或同一数据库中的不同架构/帐户,或者它们甚至可以仅通过选项进行区分,这些选项将导致它们在使用时访问不同的架构或表。


  • 一个函数,该函数可以返回单个分片 ID,给定要保存的实例;这称为 “shard_chooser”


  • 一个函数,可以返回适用于特定实例标识符的分片 ID 列表;这称为 “id_chooser”。如果返回所有分片 ID,则搜索所有分片。


  • 一个函数,该函数可以在给定特定 Query (“query_chooser”) 的情况下返回要尝试的分片 ID 列表。如果它返回所有分片 ID,则将查询所有分片并将结果联接在一起。


在这些示例中,针对同一基本示例使用了不同类型的分片,该示例基于每个大洲容纳天气数据。我们提供了示例 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser说明了对 SQL 表达式元素的检查,以便尝试确定所请求的单个分片。


构建通用分片例程是一种雄心勃勃的方法 到在多个数据库之间组织实例的问题。 对于 更直白的替代方案,“独特实体”方法 是将对象分配给不同表(并且可能 数据库节点)- 在 wiki 上描述 EntityName 的 EntityName 中。


文件列表:


  • separate_databases.py - 说明使用不同的 SQLite 数据库进行分片。


  • separate_tables.py - 说明使用单个 SQLite 数据库进行分片,但该数据库将具有使用命名约定的多个表。


  • separate_schema_translates.py - 说明了使用具有多个架构的单个数据库进行分片,其中每个分片可以使用不同的“schema_translates_map”。


  • asyncio.py - 说明与 asyncio 一起使用的分片 API。


扩展 ORM


ORM 查询事件


说明 ORM SELECT 行为增强的配方,如 Session.execute() 与 2.0 样式一起使用 select()以及 1.x 样式Query 对象。


示例包括 with_loader_criteria() 的演示 选项以及 SessionEvents.do_orm_execute() 钩子。


从 SQLAlchemy 1.4 开始,Query 构造与 Select 构造统一,因此这两个对象几乎相同。


文件列表:


Dogpile 缓存


说明如何嵌入 狗桩.cache 功能,允许完全缓存控制 以及从长期缓存中提取 “Lazy loaded” 属性的能力。


在此演示中,说明了以下技术:


  • 使用 SessionEvents.do_orm_execute() 事件钩子


  • 绕过 Session.execute() 从自定义缓存源而不是数据库拉取的基本技术。


  • 使用 dogpile.cache 进行基本缓存,使用 “regions” 允许对一组固定的配置进行全局控制。


  • 使用自定义 UserDefinedOption 对象在语句对象中配置选项。


另请参阅


Re-Executing Statements - 包括此处介绍的技术的一般示例。


例如:

# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))

# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# execute and results
result = session.execute(stmt)

print(result.scalars().all())


要运行,必须同时安装 SQLAlchemy 和 dogpile.cache 或在当前 PYTHONPATH 上。该演示将为数据文件创建一个本地目录,插入初始数据,然后运行。第二次运行演示将利用已经存在的缓存文件,并且将针对两个表发出一个 SQL 语句 - 但是,显示的结果将利用数十个从缓存中提取的延迟加载。


演示脚本本身(按复杂程度排序)作为 Python 模块运行,以便相对导入工作:

$ python -m examples.dogpile_caching.helloworld

$ python -m examples.dogpile_caching.relationship_caching

$ python -m examples.dogpile_caching.advanced

$ python -m examples.dogpile_caching.local_session_caching


文件列表:


  • environment.py - 建立数据/缓存文件路径和配置,必要时引导夹具数据。


  • caching_query.py - 表示允许将 Dogpile 缓存与 SQLAlchemy 一起使用的函数和类。引入了一个名为 FromCache 的查询选项。


  • model.py - 数据模型,表示具有多个 Address 对象的 Person,每个对象都有 PostalCode、City、Country。


  • fixture_data.py - 安装一些示例数据。在这里,我们有一些美国/加拿大城市的邮政编码。然后,安装 100 个 Person 记录,每个记录都有一个随机选择的邮政编码。


  • helloworld.py - 说明如何加载一些数据并缓存结果。


  • relationship_caching.py - 说明如何在关系端点上添加缓存选项,以便延迟加载从缓存加载。


  • advanced.py - 说明 Query 与 FromCache 选项结合使用的用法,包括前端加载、缓存失效和集合缓存。


  • local_session_caching.py - 此示例创建一个新的 dogpile.cache 后端,该后端将数据保存在当前会话的本地字典中。remove() 的会话,缓存就消失了。