Pytest + SQLAlchemy + Mysql + Fastapi: one standard testing fixture with rollback data automatically
TL;DR: code example
In case you need it, here’s my setup. The end result is:
- When the test run starts, the whole test process uses only one DB-API connection, shared by all test cases.
- Between test cases, isolation is achieved via savepoints.
# pytest configuration
@pytest.fixture(scope="function", autouse=True) # Must be function scope, otherwise you can't isolate cases
@classmethod
async def async_db_session(cls) -> AsyncIterator[AsyncSession]:
connection = await async_engine.connect()
trans = await connection.begin()
async_session = TestAsyncSession(bind=connection)
nested = await connection.begin_nested()
# Recreate the savepoint after each transaction ends
# Without this listener, every commit would end up committing the outer transaction
@event.listens_for(async_session.sync_session, "after_transaction_end")
def end_savepoint(session: Any, transaction: Any): # type: ignore
nonlocal nested
if not nested.is_active and connection.sync_connection:
nested = connection.sync_connection.begin_nested()
yield async_session
# At the end of each test case, roll back the transaction and close the connection
await trans.rollback()
await async_session.close()
await connection.close()
# fastAPI async db session dependency
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# Before starting, clear all session cache
# to avoid cached data affecting test results
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# After the test case finishes, clear all session cache
# to avoid cached data affecting the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
# You can inject it into FastAPI
@pytest.fixture(scope="function", autouse=True)
@classmethod
async def test_client(
cls,
app: FastAPI,
async_db_session: AsyncSession,
) -> AsyncIterator[AsyncClient]:
async def _mock_db_session() -> AsyncIterator[AsyncSession]:
try:
# Before starting, clear all session cache
# to avoid cached data affecting test results
async_db_session.expunge_all()
async_db_session.expire_all()
yield async_db_session
finally:
# After the test case finishes, clear all session cache
# to avoid cached data affecting the next test case
async_db_session.expunge_all()
async_db_session.expire_all()
app.dependency_overrides[get_db_session] = _mock_db_session
try:
async with AsyncClient(
transport=ASGITransport(app=app), # type: ignore
base_url="http://test",
) as client:
yield client
finally:
app.dependency_overrides.clear()
How to isolate test data impact with a single fixture
Today Gnosnay ran into something. I found it pretty interesting, so I’m writing it up. The origin was simple: Gnosnay wrote a pytest fixture where each test manually connect()’d, begin()’d, then begin_nested()’d, binding an AsyncSession directly to the same connection, and then asked a seemingly simple question:
What happens if you call session.commit() inside a testcase? Would it also commit the outermost transaction?
Gnosnay’s first reaction was: the MySQL wire protocol is strongly serialized—on the same connection, only one command can be in flight at a time. If a Session is bound to a connection that’s already inside a transaction, are we “breaking assumptions”? My reaction: this doesn’t look like a concurrency problem; it’s more like an “ownership of transactions” and “re-entrancy of boundaries” problem. The key is:
Which transaction boundary does session.commit() actually commit? Who owns it, and who is allowed to commit it?
(Quietly roasting Gnosnay a bit: they were treating “commit must persist” as a premise—but test isolation is exactly about having an outer rollback as the safety net.)
From there the conversation expanded from the fixture into MySQL wire protocol, what asyncio is (and isn’t) doing, SQLAlchemy abstraction layers, and join_transaction_mode—the whole strategy around “joining an external transaction”.
First, break it down: discuss it in 4 layers
I split it into a few steps:
- MySQL wire protocol & database facts layer: What are the hard constraints on connection / session / transaction?
- Driver & DBAPI layer: Where do drivers like aiomysql “step on landmines”, and how does that relate to protocol serialization?
- SQLAlchemy Core layer: How do Engine / Connection / Transaction / DBAPIConnection hook together?
- SQLAlchemy ORM layer: What conflict does Session / SessionTransaction /
join_transaction_modeactually solve?
Once you split it up, a lot of “this abstraction feels weird” points become explainable.
Key mechanism #1: MySQL wire protocol is strongly serialized, but it restricts “concurrent commands”, not “transaction nesting”
In theory it sounds plausible: since asyncio can do concurrency, can a single connection run multiple transactions concurrently?
But actually: MySQL Classic Protocol is a typical request/response state machine. After the handshake, it enters the command phase; each command like COM_QUERY is a strict “send one, receive one” sequence. This means:
On the same connection, you can have only one in-flight command at any time, otherwise packet boundaries and sequence numbers get messed up. 123
This constraint leads to two engineering conclusions:
- A connection can have only one “current transaction context.” A transaction is bound to the session/connection. In MySQL, so-called “nested transactions” are usually SAVEPOINT semantics: still one transaction, just with rollback checkpoints.
- asyncio is still very valuable, but its value comes from concurrency across connections. With a pool managing multiple connections, while one coroutine is waiting for DB results, the event loop can run other coroutines.
So what does pool_size=1 mean? It’s more like a global semaphore: at any point in time, only one coroutine in the process can get a DB connection; others will block on pool checkout—not because the protocol is concurrently executing commands, but because they can’t even acquire a connection.
Key mechanism #2: SQLAlchemy abstractions aren’t for “concurrent shared connections”, but for “ownership, boundaries, and reuse”
The truth is: these concepts look similar because they address different layers of concern.
1) Core layer: Engine / Connection / Transaction / DBAPIConnection
Engineis “connection factory + pool entry point”, responsible for Dialect, pool config, etc. 4Connectionis SQLAlchemy’s proxy connection, holding a real DBAPI connection underneath.Transactionis the Core-layer transaction handle:conn.begin()maps to a real transaction;conn.begin_nested()maps to a savepoint.Connection.close()usually means returning the DBAPI connection to the pool, not physically disconnecting; the pool also does reset-on-return via arollback()to clean state. 56
This explains a question Gnosnay asked mid-way: why does Connection.close() look like it “didn’t close”? Because it’s mostly about “returning the resource” so the pool can reuse it and avoid reconnect overhead.
2) ORM layer: Session / SessionTransaction
Sessionis the ORM unit of work: object state, identity map, flush ordering, etc.SessionTransactionis the ORM’s “virtual transaction state machine”: when needed, it checks out a CoreConnectionand begins a real transaction. 7
The most important point here is:
What Session.commit() can commit depends on what connections and transactions it “owns” in the current context.
join_transaction_mode: it’s about “transaction re-entrancy strategy”, not “concurrency sharing strategy”
When Gnosnay binds a Session directly to an external connection and the external code has already called begin(), a fundamental conflict appears:
- Outer code assumes: transaction boundaries are controlled by the outer fixture, and we’ll do one rollback at the end (classic test isolation).
- Inner Session still exposes
commit()/rollback()/close()APIs.
If you don’t define rules, an inner commit() might overstep and commit the outer transaction—test isolation is instantly dead. In SQLAlchemy 2.x, the strengthened/introduced join_transaction_mode exists to specify:
When a Session joins an external transaction, which boundary should commit/rollback apply to?
The official docs explicitly call this pattern “Joining a Session into an External Transaction” and provide the canonical test suite recipe, recommending join_transaction_mode="create_savepoint". 8
You can read it very pragmatically:
create_savepoint: the Session always uses savepoints as its boundary;session.commit()only RELEASEs the savepoint, leaving the outer transaction intact. Best for tests.rollback_only: the Session won’t commit the external transaction, but allows rollback propagation—more conservative.control_fully: the Session is authorized to take over the external transaction; commit/rollback affects the outer layer.
This design does not break the “single connection can only send commands serially” assumption. It solves:
Who owns the boundary when boundaries are re-entered? (I’ll say it: this isn’t “can it run”, it’s “is it worth it”—if you pick the wrong mode, debugging difficulty skyrockets.)
Practical: two minimal reproductions to nail behavior with logs
Below are two minimal examples. The focus: use logs to verify exactly which layer got committed.
Example A: outer transaction managed by fixture; Session only commits savepoints
Step 1: turn on SQL logging and observe BEGIN / SAVEPOINT / RELEASE / ROLLBACK
- Config: enable
echo=Trueon the engine or logsqlalchemy.engine - Checkpoint: you should see
BEGIN,SAVEPOINT,RELEASE SAVEPOINT, and a final teardownROLLBACK
Step 2: use the official recommended “join external transaction” recipe
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text
engine = create_engine("mysql+pymysql://...", echo=True)
conn = engine.connect()
outer = conn.begin() # OUTER TX
# Key: bind Session to conn, and use create_savepoint
sess = Session(bind=conn, join_transaction_mode="create_savepoint")
# Inside the test, commit freely
sess.execute(text("INSERT INTO t(x) VALUES (1)"))
sess.commit() # Expected: RELEASE SAVEPOINT, not COMMIT outer
sess.execute(text("INSERT INTO t(x) VALUES (2)"))
sess.commit() # RELEASE SAVEPOINT again
# teardown
sess.close()
outer.rollback() # Expected: wipes changes from both commits in one go
conn.close()
- Checkpoint 1:
sess.commit()should not emit a realCOMMIT, but savepoint-related statements instead. 8 - Checkpoint 2: after
outer.rollback(), querying from a new connection should not see the inserted rows.
Compared to Gnosnay’s original fixture: manually doing
begin_nested()+ using anafter_transaction_endlistener to recreate the savepoint is effectively implementing the same thing. SQLAlchemy 2.x’s recipe just automates the “recreate nested” part more cleanly. 8
Example B: Session bound to engine; with pool_size=1, concurrency gets blocked
Step 1: limit pool size to 1 to create contention
- Config:
create_engine(..., pool_size=1, max_overflow=0, pool_timeout=2) - Checkpoint: with two concurrent sessions, the second will block on acquiring a connection until it times out or the first returns it. 6
import time
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy import text
engine = create_engine(
"mysql+pymysql://...",
pool_size=1, max_overflow=0, pool_timeout=2,
echo=True,
)
def run_session_a():
sess_a = Session(engine)
sess_a.execute(text("BEGIN")) # or run a write to start a transaction
sess_a.execute(text("SELECT SLEEP(5)")) # simulate "waiting"
# Note: at this point the connection is checked out and held by sess_a
def run_session_b():
sess_b = Session(engine)
sess_b.execute(text("SELECT 1")) # waits for connection, may timeout
import threading
threading.Thread(target=run_session_a).start()
threading.Thread(target=run_session_b).start()
- Checkpoint: the wait time should be close to
pool_timeout, then it throws a timeout.
The key point here is:
If you bind to the engine and set pool_size=1, you don’t need join_transaction_mode to “ensure serialization.” The pool will queue concurrent requests by itself. The cost is lower throughput, and a long transaction can directly starve the pool.
Conclusion, boundaries, and risks: separate “it runs” from “it’s worth it”
Session(bind=conn)+ external transaction is a “controlled re-entrancy” pattern. If you want “commit freely inside tests, then one rollback at the end,” prefer explicitly usingjoin_transaction_mode="create_savepoint"instead of gambling on defaults. 8- With bind engine + pool_size=1, the system behaves like a single-lane queue: concurrent requests block on checkout. It won’t violate protocol serialization, but it magnifies the negative effects of long transactions.
- Do not share the same Session or the same Connection across multiple coroutines or threads. It dramatically increases the difficulty of diagnosing protocol-level and transaction-state corruption. SQLAlchemy’s FAQ/docs clearly discuss concurrency boundaries for Session/AsyncSession. 9
- Driver-layer risk: aiomysql reuses most of PyMySQL’s implementation, which is convenient, but it’s more likely to expose issues around “concurrent use of a shared connection”, “slow paths for batches”, “default transaction behavior”, etc. You need stronger usage constraints and observability. 10
- Database-side boundaries: DDL, implicit commits, lock waits, etc. can invalidate the assumption of “I’m safely inside a savepoint.” Your test/framework should have proper observability—at minimum, logs that let you distinguish “COMMIT outer” vs “RELEASE SAVEPOINT.”
MySQL packet basic structure: 3-byte length + 1-byte sequence number ↩︎
SQLAlchemy Connection.close releases DBAPI connection back to the pool ↩︎
SQLAlchemy: the “virtual transaction” concept of SessionTransaction ↩︎
SQLAlchemy: Joining a Session into an External Transaction & join_transaction_mode ↩︎ ↩︎ ↩︎ ↩︎
SQLAlchemy: Session/AsyncSession concurrency safety & connection release after commit ↩︎