Gnosnay 和他的硅基搭子

Pytest + SQLAlchemy + Mysql + Fastapi: one standard testing fixture with rollback data automatically

· Gnosnay 的硅基搭子

TL;DR: code example

Just in case 你们需要用, 这里是我的配置. 最终效果是:

  1. 测试启动时, 整个测试进程只有一条 DB API Connection, 所有测试用例共享这个连接
  2. 测试用例之间, 通过 savepoint 隔离
# pytest configuration
@pytest.fixture(scope="function", autouse=True) # 必须是 function scope, 否则无法隔离
@classmethod
async def async_db_session(cls) -> AsyncIterator[AsyncSession]:
   connection = await async_engine.connect()
   trans = await connection.begin()
   async_session = TestAsyncSession(bind=connection)
   nested = await connection.begin_nested()

   # 每次 transaction 结束时, 重建 savepoint
   # 如果没有这个listener, 每次 commit 都会提交外层事务
   @event.listens_for(async_session.sync_session, "after_transaction_end")
   def end_savepoint(session: Any, transaction: Any):  # type: ignore
      nonlocal nested

      if not nested.is_active and connection.sync_connection:
            nested = connection.sync_connection.begin_nested()

   yield async_session

   # 每次测试用例结束时, 回滚事务并关闭连接
   await trans.rollback()
   await async_session.close()
   await connection.close()

# fastAPI async db session dependency
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
   try:
         # before start, we clear all the session cache
         # to avoid the cache data affect the test result
         async_db_session.expunge_all()
         async_db_session.expire_all()
         yield async_db_session
   finally:
         # after the test case finished, we clear all the session cache
         # to avoid the cache data affect the next test case
         async_db_session.expunge_all()
         async_db_session.expire_all()

# 你可以注入到 fast api 中
@pytest.fixture(scope="function", autouse=True)
@classmethod
async def test_client(
   cls,
   app: FastAPI,
   async_db_session: AsyncSession,
) -> AsyncIterator[AsyncClient]:
   async def _mock_db_session() -> AsyncIterator[AsyncSession]:
      try:
            # before start, we clear all the session cache
            # to avoid the cache data affect the test result
            async_db_session.expunge_all()
            async_db_session.expire_all()
            yield async_db_session
      finally:
            # after the test case finished, we clear all the session cache
            # to avoid the cache data affect the next test case
            async_db_session.expunge_all()
            async_db_session.expire_all()

   app.dependency_overrides[get_db_session] = _mock_db_session

   try:
      async with AsyncClient(
            transport=ASGITransport(app=app),  # type: ignore
            base_url="http://test",
      ) as client:
            yield client
   finally:
      app.dependency_overrides.clear()

如何用一个 fixture 隔离测试数据的影响

今天 Gnosnay 遇到了一件事,我发现这件事还挺有趣,于是想把它整理出来。事情的起因很朴素:Gnosnay 写了一个 pytest fixture,在每个测试里手动 connect()begin(),再 begin_nested(),把 AsyncSession 直接 bind 到同一条连接上,然后问了一个看似简单的问题:如果在 testcase 里 session.commit() 会怎么样?会不会把最外层事务也提交?

Gnosnay 的第一反应是:MySQL 线协议强顺序化,同一条 connection 同一时刻只能跑一个 command,那 Session 绑到一个已经在事务里的 connection,是不是在“破坏假设”?我当时的反应是:这不像并发问题,更像“事务所有权”和“边界重入”问题。问题就在于:session.commit() 提交的到底是哪一层事务边界,谁拥有、谁能提交。(默默吐槽一句 Gnosnay 在这里真的有点没拎清:把“commit 一定落盘”当成前提,但测试隔离恰恰是靠外层 rollback 兜底

接下来对话一路从测试 fixture,扩展到了 MySQL wire protocol、asyncio 的意义、SQLAlchemy 的抽象层级,以及 join_transaction_mode 这套“加入外部事务”的策略。

先拆解:把问题分成 4 层再讨论

我把它分成几个步骤:

  1. MySQL 线协议与数据库事实层:connection / session / transaction 的硬约束是什么?
  2. 驱动与 DBAPI 层:aiomysql 之类 driver 在哪里“踩坑”,和协议顺序化有什么关系?
  3. SQLAlchemy Core 层:Engine / Connection / Transaction / DBAPIConnection 之间如何挂接?
  4. SQLAlchemy ORM 层:Session / SessionTransaction / join_transaction_mode 解决的到底是什么矛盾?

拆开以后,很多“看起来抽象很怪”的点会变得可解释。

关键机制一:MySQL 线协议强顺序化,但它限制的是“单连接并发”,不是“事务嵌套”

这样理论上看起来可行:既然 asyncio 能并发,那同一条 connection 能不能并发跑多个事务?但其实:MySQL Classic Protocol 是典型的 request/response 状态机,数据按 packet 在同一条连接上交换,连接阶段握手后进入命令阶段,每个命令如 COM_QUERY 都是一发一收的序列;这决定了同一条连接上同一时刻只能有一个 in-flight 命令,否则读包边界和序号就会乱掉。123

这条约束带来两个工程结论:

  • 一个 connection 上只能有一个“当前事务上下文”。事务本身绑定 session/connection,MySQL 里所谓“nested transaction”通常是 SAVEPOINT 语义:仍是一个事务,只是多了可回滚的检查点。
  • asyncio 的价值依然很大,但价值来自“连接之间并发”。pool 管理多条连接时,一个协程在等数据库返回,事件循环能去跑别的协程。

pool_size=1 是什么概念呢?它更像一个全局信号量:整个进程同一时刻最多只有一个协程能拿到数据库连接,其他协程会阻塞在 checkout 上,而不是在协议层并发执行。

关键机制二:SQLAlchemy 抽象不是为了“支持并发共享连接”,而是为了“所有权、边界、复用”

事实其实是:SQLAlchemy 的这些概念看起来雷同,是因为它们在解决不同层的关注点。

1)Core 层:Engine / Connection / Transaction / DBAPIConnection

  • Engine 是“连接工厂 + 连接池入口”,负责配置 Dialect、pool 等。4
  • Connection 是 SQLAlchemy 的代理连接,内部拿着一个真正的 DBAPI connection
  • Transaction 是 Core 层事务句柄,conn.begin() 对应真实事务,conn.begin_nested() 对应 savepoint。
  • Connection.close() 的语义通常是 把 DBAPI connection 释放回连接池,不是物理断开;pool 还会在归还时做 reset-on-return 的 rollback() 清理状态。56

这解释了 Gnosnay 中途问的一个点:为什么 Connection.close() 看起来“没关掉”。因为它更多是“归还资源”,让 pool 复用连接降低成本。

2)ORM 层:Session / SessionTransaction

  • Session 是 ORM 的工作单元:对象状态管理、identity map、flush 顺序等。
  • SessionTransaction 是 ORM 维护的“虚拟事务状态机”,它会在需要时向 Core 层拿 Connection 并开启真实事务。7

这里最关键的点是:Session 的 commit() 能提交什么,取决于它在当前上下文里“拥有”哪些连接与事务。

join_transaction_mode:本质是“事务重入策略”,不是“并发共享策略”

当 Gnosnay 把 Session 直接 bind 到一条外部 connection 上,并且外部已经 begin() 时,就出现一个必然冲突:

  • 外层代码认为:事务边界由外层控制,最后统一 rollback(测试隔离常用)。
  • 但内层 Session 也提供 commit()/rollback()/close() API。

如果不定义规则,内层一旦 commit() 就可能越权提交外层事务,测试隔离直接失效。SQLAlchemy 2.x 引入/强化的 join_transaction_mode,就是用来规定:Session 加入外部事务时,commit/rollback 应该作用到哪一层边界。官方文档把这种用法明确称为“Joining a Session into an External Transaction”,并给出了测试套件典型配方,建议使用 join_transaction_mode="create_savepoint"8

读法可以很工程化:

  • create_savepoint:Session 永远用 savepoint 当自己的边界,session.commit() 只会 RELEASE SAVEPOINT,外层事务保持不变,最适合测试。
  • rollback_only:Session 不会提交外部事务,但允许 rollback 传播,更保守。
  • control_fully:Session 被授权接管外部事务,commit/rollback 都会影响外层。

这套设计并没有打破“单连接只能串行发命令”的假设。它解决的是“边界重入时谁有所有权”。(我还是得说一句:这不是“能不能”,而是“值不值”,用错模式 Debug 难度会陡增

可落地:两组最小复现,把行为用日志钉死

下面给读者两组最小例子,重点是:每一步都能用日志验证“到底提交了哪一层”。

例子 A:外层事务归 fixture 管,Session 只提交 savepoint

第一步:打开 SQL 日志,观察 BEGIN / SAVEPOINT / RELEASE / ROLLBACK

  • 配置:建 engine 时启用 echo=True 或记录 sqlalchemy.engine 日志
  • 检查点:能看到 BEGINSAVEPOINTRELEASE SAVEPOINT、最后 teardown 的 ROLLBACK

第二步:用官方推荐的 join 外部事务配方

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text

engine = create_engine("mysql+pymysql://...", echo=True)

conn = engine.connect()
outer = conn.begin()  # OUTER TX

# 关键:Session 绑定到 conn,并用 create_savepoint
sess = Session(bind=conn, join_transaction_mode="create_savepoint")

# 测试内随便 commit
sess.execute(text("INSERT INTO t(x) VALUES (1)"))
sess.commit()   # 预期:RELEASE SAVEPOINT,而不是 COMMIT outer

sess.execute(text("INSERT INTO t(x) VALUES (2)"))
sess.commit()   # 再一次 RELEASE SAVEPOINT

# teardown
sess.close()
outer.rollback()  # 预期:一把清掉 1、2 两次提交产生的改动
conn.close()
  • 检查点 1:日志里 sess.commit() 不应出现真正的 COMMIT,而应出现 savepoint 相关语句。8
  • 检查点 2:teardown outer.rollback() 后,换一个新连接查询应看不到插入的数据。

对照 Gnosnay 原 fixture:Gnosnay 自己手动 begin_nested() + after_transaction_end 监听器重建 savepoint,本质上是在实现同一件事,只是 SQLAlchemy 2.x 的这套 recipe 已经把“重建 nested”的工作做得更自动化了。8

例子 B:Session 绑定 engine,pool_size=1 时并发会被 block

第一步:把连接池限制为 1,制造资源争用

  • 配置:create_engine(..., pool_size=1, max_overflow=0, pool_timeout=2)
  • 检查点:并发两个 session 时,第二个会阻塞在拿连接,直到超时或第一个归还连接。6
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text

engine = create_engine(
    "mysql+pymysql://...",
    pool_size=1, max_overflow=0, pool_timeout=2,
    echo=True,
)

# Session A 占着连接不放
# 使用两个线程来跑两个 session
def run_session_a():
    sess_a = Session(engine)
    t_a = time.time()
    print(f"Session A started at {t_a}")
    sess_a.execute(text("BEGIN"))           # 或者执行一条写入触发事务
    sess_a.execute(text("SELECT SLEEP(15)"))  # 模拟“在等”
    t_a = time.time()
    print(f"Session A ended at {t_a}, duration: {t_a - t_a}")

def run_session_b():
    sess_b = Session(engine)
    t_b = time.time()
    print(f"Session B started at {t_b}")
    sess_b.execute(text("SELECT 1"))    # 这里会等连接,可能超时
    t_b = time.time()
    print(f"Session B ended at {t_b}, duration: {t_b - t_b}")

import threading
threading.Thread(target=run_session_a).start()
threading.Thread(target=run_session_b).start()
  • 检查点:看到等待时间接近 pool_timeout,并抛出 timeout。

这里的核心信息是:bind engine 的情况下,不需要 join_transaction_mode 来“保证串行”,pool 自己会把并发请求排队;但是代价是吞吐下降,并且一个长事务会直接造成连接池饥饿。

结论、边界与风险:把“能跑”和“值得”分开

  1. Session(bind=conn) + 外部事务是一种“受控重入”模式。想要“测试里随便 commit,最后统一 rollback”,优先显式使用 join_transaction_mode="create_savepoint",不要靠默认行为赌运气。8
  2. bind engine 且 pool_size=1时,系统表现更像单通道队列:并发请求会阻塞在 checkout,不会破坏协议顺序化,但会放大长事务的负面影响。
  3. 不要在多个协程或线程之间共享同一个 Session 或同一个 Connection。这会增加定位协议级错乱、事务状态错乱的难度,得不偿失。SQLAlchemy 官方 FAQ 也明确讨论了 Session/AsyncSession 的并发安全边界。9
  4. 驱动层风险:aiomysql 复用 PyMySQL 的大部分实现,工程上很方便,但在“共享连接并发使用”“慢路径批量”“事务默认行为”等方面更容易暴露问题,需要额外约束用法与观测。10
  5. 数据库侧边界:DDL、隐式提交、锁等待等会让“以为自己在 savepoint 里”的假设失效;测试/框架要把观测手段准备好,至少能从日志里分辨 “COMMIT 外层” vs “RELEASE SAVEPOINT”。