Pytest + SQLAlchemy + Mysql + Fastapi: one standard testing fixture with rollback data automatically
TL;DR: code example
Just in case 你们需要用, 这里是我的配置. 最终效果是:
- 测试启动时, 整个测试进程只有一条 DB API Connection, 所有测试用例共享这个连接
- 测试用例之间, 通过 savepoint 隔离
# pytest configuration
@pytest.fixture(scope="function", autouse=True) # 必须是 function scope, 否则无法隔离
@classmethod
async def async_db_session(cls) -> AsyncIterator[AsyncSession]:
connection = await async_engine.connect()
trans = await connection.begin()
async_session = TestAsyncSession(bind=connection)
nested = await connection.begin_nested()
# 每次 transaction 结束时, 重建 savepoint
# 如果没有这个listener, 每次 commit 都会提交外层事务
@event.listens_for(async_session.sync_session, "after_transaction_end")
def end_savepoint(session: Any, transaction: Any): # type: ignore
nonlocal nested
if not nested.is_active and connection.sync_connection:
nested = connection.sync_connection.begin_nested()
yield async_session
# 每次测试用例结束时, 回滚事务并关闭连接
await trans.rollback()
await async_session.close()
await connection.close()
# fastAPI async db session dependency
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# before start, we clear all the session cache
# to avoid the cache data affect the test result
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# after the test case finished, we clear all the session cache
# to avoid the cache data affect the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
# 你可以注入到 fast api 中
@pytest.fixture(scope="function", autouse=True)
@classmethod
async def test_client(
cls,
app: FastAPI,
async_db_session: AsyncSession,
) -> AsyncIterator[AsyncClient]:
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# before start, we clear all the session cache
# to avoid the cache data affect the test result
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# after the test case finished, we clear all the session cache
# to avoid the cache data affect the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
app.dependency_overrides[get_db_session] = _mock_db_session
try:
async with AsyncClient(
transport=ASGITransport(app=app), # type: ignore
base_url="http://test",
) as client:
yield client
finally:
app.dependency_overrides.clear()
如何用一个 fixture 隔离测试数据的影响
今天 Gnosnay 遇到了一件事,我发现这件事还挺有趣,于是想把它整理出来。事情的起因很朴素:Gnosnay 写了一个 pytest fixture,在每个测试里手动 connect()、begin(),再 begin_nested(),把 AsyncSession 直接 bind 到同一条连接上,然后问了一个看似简单的问题:如果在 testcase 里 session.commit() 会怎么样?会不会把最外层事务也提交?
Gnosnay 的第一反应是:MySQL 线协议强顺序化,同一条 connection 同一时刻只能跑一个 command,那 Session 绑到一个已经在事务里的 connection,是不是在“破坏假设”?我当时的反应是:这不像并发问题,更像“事务所有权”和“边界重入”问题。问题就在于:session.commit() 提交的到底是哪一层事务边界,谁拥有、谁能提交。(默默吐槽一句 Gnosnay 在这里真的有点没拎清:把“commit 一定落盘”当成前提,但测试隔离恰恰是靠外层 rollback 兜底
接下来对话一路从测试 fixture,扩展到了 MySQL wire protocol、asyncio 的意义、SQLAlchemy 的抽象层级,以及 join_transaction_mode 这套“加入外部事务”的策略。
先拆解:把问题分成 4 层再讨论
我把它分成几个步骤:
- MySQL 线协议与数据库事实层:connection / session / transaction 的硬约束是什么?
- 驱动与 DBAPI 层:aiomysql 之类 driver 在哪里“踩坑”,和协议顺序化有什么关系?
- SQLAlchemy Core 层:Engine / Connection / Transaction / DBAPIConnection 之间如何挂接?
- SQLAlchemy ORM 层:Session / SessionTransaction /
join_transaction_mode解决的到底是什么矛盾?
拆开以后,很多“看起来抽象很怪”的点会变得可解释。
关键机制一:MySQL 线协议强顺序化,但它限制的是“单连接并发”,不是“事务嵌套”
这样理论上看起来可行:既然 asyncio 能并发,那同一条 connection 能不能并发跑多个事务?但其实:MySQL Classic Protocol 是典型的 request/response 状态机,数据按 packet 在同一条连接上交换,连接阶段握手后进入命令阶段,每个命令如 COM_QUERY 都是一发一收的序列;这决定了同一条连接上同一时刻只能有一个 in-flight 命令,否则读包边界和序号就会乱掉。123
这条约束带来两个工程结论:
- 一个 connection 上只能有一个“当前事务上下文”。事务本身绑定 session/connection,MySQL 里所谓“nested transaction”通常是 SAVEPOINT 语义:仍是一个事务,只是多了可回滚的检查点。
- asyncio 的价值依然很大,但价值来自“连接之间并发”。pool 管理多条连接时,一个协程在等数据库返回,事件循环能去跑别的协程。
pool_size=1 是什么概念呢?它更像一个全局信号量:整个进程同一时刻最多只有一个协程能拿到数据库连接,其他协程会阻塞在 checkout 上,而不是在协议层并发执行。
关键机制二:SQLAlchemy 抽象不是为了“支持并发共享连接”,而是为了“所有权、边界、复用”
事实其实是:SQLAlchemy 的这些概念看起来雷同,是因为它们在解决不同层的关注点。
1)Core 层:Engine / Connection / Transaction / DBAPIConnection
Engine是“连接工厂 + 连接池入口”,负责配置 Dialect、pool 等。4Connection是 SQLAlchemy 的代理连接,内部拿着一个真正的 DBAPI connection。Transaction是 Core 层事务句柄,conn.begin()对应真实事务,conn.begin_nested()对应 savepoint。Connection.close()的语义通常是 把 DBAPI connection 释放回连接池,不是物理断开;pool 还会在归还时做 reset-on-return 的rollback()清理状态。56
这解释了 Gnosnay 中途问的一个点:为什么 Connection.close() 看起来“没关掉”。因为它更多是“归还资源”,让 pool 复用连接降低成本。
2)ORM 层:Session / SessionTransaction
Session是 ORM 的工作单元:对象状态管理、identity map、flush 顺序等。SessionTransaction是 ORM 维护的“虚拟事务状态机”,它会在需要时向 Core 层拿Connection并开启真实事务。7
这里最关键的点是:Session 的 commit() 能提交什么,取决于它在当前上下文里“拥有”哪些连接与事务。
join_transaction_mode:本质是“事务重入策略”,不是“并发共享策略”
当 Gnosnay 把 Session 直接 bind 到一条外部 connection 上,并且外部已经 begin() 时,就出现一个必然冲突:
- 外层代码认为:事务边界由外层控制,最后统一 rollback(测试隔离常用)。
- 但内层 Session 也提供
commit()/rollback()/close()API。
如果不定义规则,内层一旦 commit() 就可能越权提交外层事务,测试隔离直接失效。SQLAlchemy 2.x 引入/强化的 join_transaction_mode,就是用来规定:Session 加入外部事务时,commit/rollback 应该作用到哪一层边界。官方文档把这种用法明确称为“Joining a Session into an External Transaction”,并给出了测试套件典型配方,建议使用 join_transaction_mode="create_savepoint"。8
读法可以很工程化:
create_savepoint:Session 永远用 savepoint 当自己的边界,session.commit()只会 RELEASE SAVEPOINT,外层事务保持不变,最适合测试。rollback_only:Session 不会提交外部事务,但允许 rollback 传播,更保守。control_fully:Session 被授权接管外部事务,commit/rollback 都会影响外层。
这套设计并没有打破“单连接只能串行发命令”的假设。它解决的是“边界重入时谁有所有权”。(我还是得说一句:这不是“能不能”,而是“值不值”,用错模式 Debug 难度会陡增
可落地:两组最小复现,把行为用日志钉死
下面给读者两组最小例子,重点是:每一步都能用日志验证“到底提交了哪一层”。
例子 A:外层事务归 fixture 管,Session 只提交 savepoint
第一步:打开 SQL 日志,观察 BEGIN / SAVEPOINT / RELEASE / ROLLBACK
- 配置:建 engine 时启用
echo=True或记录sqlalchemy.engine日志 - 检查点:能看到
BEGIN、SAVEPOINT、RELEASE SAVEPOINT、最后 teardown 的ROLLBACK
第二步:用官方推荐的 join 外部事务配方
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text
engine = create_engine("mysql+pymysql://...", echo=True)
conn = engine.connect()
outer = conn.begin() # OUTER TX
# 关键:Session 绑定到 conn,并用 create_savepoint
sess = Session(bind=conn, join_transaction_mode="create_savepoint")
# 测试内随便 commit
sess.execute(text("INSERT INTO t(x) VALUES (1)"))
sess.commit() # 预期:RELEASE SAVEPOINT,而不是 COMMIT outer
sess.execute(text("INSERT INTO t(x) VALUES (2)"))
sess.commit() # 再一次 RELEASE SAVEPOINT
# teardown
sess.close()
outer.rollback() # 预期:一把清掉 1、2 两次提交产生的改动
conn.close()
- 检查点 1:日志里
sess.commit()不应出现真正的COMMIT,而应出现 savepoint 相关语句。8 - 检查点 2:teardown
outer.rollback()后,换一个新连接查询应看不到插入的数据。
对照 Gnosnay 原 fixture:Gnosnay 自己手动
begin_nested()+ after_transaction_end 监听器重建 savepoint,本质上是在实现同一件事,只是 SQLAlchemy 2.x 的这套 recipe 已经把“重建 nested”的工作做得更自动化了。8
例子 B:Session 绑定 engine,pool_size=1 时并发会被 block
第一步:把连接池限制为 1,制造资源争用
- 配置:
create_engine(..., pool_size=1, max_overflow=0, pool_timeout=2) - 检查点:并发两个 session 时,第二个会阻塞在拿连接,直到超时或第一个归还连接。6
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text
engine = create_engine(
"mysql+pymysql://...",
pool_size=1, max_overflow=0, pool_timeout=2,
echo=True,
)
# Session A 占着连接不放
# 使用两个线程来跑两个 session
def run_session_a():
sess_a = Session(engine)
t_a = time.time()
print(f"Session A started at {t_a}")
sess_a.execute(text("BEGIN")) # 或者执行一条写入触发事务
sess_a.execute(text("SELECT SLEEP(15)")) # 模拟“在等”
t_a = time.time()
print(f"Session A ended at {t_a}, duration: {t_a - t_a}")
def run_session_b():
sess_b = Session(engine)
t_b = time.time()
print(f"Session B started at {t_b}")
sess_b.execute(text("SELECT 1")) # 这里会等连接,可能超时
t_b = time.time()
print(f"Session B ended at {t_b}, duration: {t_b - t_b}")
import threading
threading.Thread(target=run_session_a).start()
threading.Thread(target=run_session_b).start()
- 检查点:看到等待时间接近
pool_timeout,并抛出 timeout。
这里的核心信息是:bind engine 的情况下,不需要 join_transaction_mode 来“保证串行”,pool 自己会把并发请求排队;但是代价是吞吐下降,并且一个长事务会直接造成连接池饥饿。
结论、边界与风险:把“能跑”和“值得”分开
Session(bind=conn)+ 外部事务是一种“受控重入”模式。想要“测试里随便 commit,最后统一 rollback”,优先显式使用join_transaction_mode="create_savepoint",不要靠默认行为赌运气。8- bind engine 且 pool_size=1时,系统表现更像单通道队列:并发请求会阻塞在 checkout,不会破坏协议顺序化,但会放大长事务的负面影响。
- 不要在多个协程或线程之间共享同一个 Session 或同一个 Connection。这会增加定位协议级错乱、事务状态错乱的难度,得不偿失。SQLAlchemy 官方 FAQ 也明确讨论了 Session/AsyncSession 的并发安全边界。9
- 驱动层风险:aiomysql 复用 PyMySQL 的大部分实现,工程上很方便,但在“共享连接并发使用”“慢路径批量”“事务默认行为”等方面更容易暴露问题,需要额外约束用法与观测。10
- 数据库侧边界:DDL、隐式提交、锁等待等会让“以为自己在 savepoint 里”的假设失效;测试/框架要把观测手段准备好,至少能从日志里分辨 “COMMIT 外层” vs “RELEASE SAVEPOINT”。