Async Support¶
Peewee’s async extension provides asyncio-compatible database backends built on
the standard async drivers, aiosqlite, asyncpg and aiomysql.
Queries are dispatched to the driver as a coroutine and awaited on the asyncio
event loop, while Peewee’s query-building and result-processing code runs
unmodified. See How it works for the mechanism.
Example¶
playhouse.pwasyncio contains the async database implementations. Typically
this is the only thing you will need in order to use Peewee with asyncio:
import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase
db = AsyncSqliteDatabase('my_app.db')
class User(db.Model):
name = TextField()
Queries must be executed through an async execution method. This ensures that
when blocking would occur, control is properly yielded to the event loop. The
database context (async with db) acquires a connection from the pool and
releases it on exit:
async def main():
async with db:
await db.acreate_tables([User])
# Create a new user in a transaction.
async with db.atomic() as txn:
user = await db.run(User.create, name='Charlie')
# Fetch a single row from the database.
charlie = await db.get(User.select().where(User.name == 'Charlie'))
assert charlie.name == user.name
# Execute a query and iterate results.
for user in await db.list(User.select().order_by(User.name)):
print(user.name)
# Async lazy result fetching (uses server-side cursors where
# available).
query = User.select().order_by(User.name)
async for user in db.iterate(query):
print(user.name)
await db.close_pool()
asyncio.run(main())
How it works¶
Internally the extension uses greenlet the same way SQLAlchemy’s asyncio
support does: purely as a stack-switching mechanism, so that Peewee’s
synchronous internals can be suspended mid-call while the async driver
performs I/O. Whenever a query executes, control switches to the event loop and
the I/O coroutine is awaited like any other awaitable. Then the original call
resumes with the result.
Note
This is real asyncio, NOT gevent-style concurrency. Nothing is monkey-patched, no sockets are wrapped, and the event loop is the ordinary asyncio loop running the rest of your application.
Installation¶
Requires Python 3.8 or newer, greenlet and an async database driver:
pip install peewee greenlet
pip install aiosqlite # SQLite
pip install asyncpg # Postgresql
pip install aiomysql # MySQL / MariaDB
Supported backends:
Database |
Driver |
Peewee class |
|---|---|---|
SQLite |
aiosqlite |
|
Postgresql |
asyncpg |
|
MySQL / MariaDB |
aiomysql |
Execution Methods¶
db.run() - general-purpose entry point¶
run() accepts any callable and runs it inside a
greenlet bridge. The callable can contain arbitrary synchronous Peewee code,
including transactions:
# Single operation:
user = await db.run(User.create, name='Alice')
# Multi-step function:
def register(username, bio):
with db.atomic():
user = User.create(name=username)
Profile.create(user=user, bio=bio)
return user
user = await db.run(register, 'alice', 'Python developer')
Use db.run() when:
You have existing synchronous code you want to call from async.
A single operation involves multiple queries (e.g. a transaction).
Async Helper Methods¶
For single-query operations, the async helpers are more direct:
# Execute any query and get its natural return type.
cursor = await db.aexecute(query)
# Use a transaction:
async with db.atomic() as tx:
await db.run(User.create, name='Bob')
# SELECT and return one model instance (raises DoesNotExist if none).
user = await db.get(User.select().where(User.name == 'Alice'))
# SELECT and return a list.
users = await db.list(User.select().order_by(User.name))
# SELECT and stream results from the database asynchronously.
users = [user async for user in db.iterate(User.select())]
# SELECT and return a scalar value.
count = await db.scalar(User.select(fn.COUNT(User.id)))
# Or use the shortcut.
count = await db.count(User.select())
# CREATE TABLE / DROP TABLE:
await db.acreate_tables([User, Tweet])
await db.adrop_tables([User, Tweet])
# Raw SQL:
cursor = await db.aexecute_sql('SELECT 1')
print(cursor.fetchall()) # [(1,)]
Transactions¶
Use async with db.atomic() for async-aware transactions:
async with db.atomic():
await db.run(User.create, name='Alice')
await db.run(User.create, name='Bob')
# Nesting and explicit commit/rollback work.
async with db.atomic() as nested:
await db.aexecute(User.delete().where(User.name == 'Bob'))
await nested.arollback() # Un-delete Bob.
# Both Alice and Bob are in the database.
Or wrap transactional code in db.run():
def create_users():
with db.atomic():
User.create(name='Alice')
User.create(name='Bob')
with db.atomic() as nested:
User.delete().where(User.name == 'Bob').execute()
nested.rollback() # Un-delete Bob.
await db.run(create_users)
# Both Alice and Bob are in the database.
Both approaches produce the same result. The db.run() form is often simpler
when the transactional logic involves many inter-dependent queries.
Connection Management¶
The database context manager (async with db) is the recommended way to
manage connections. It acquires a connection on entry and releases it on exit:
async with db:
# Connection is available here.
pass
# Connection released.
Explicit control is also available:
await db.aconnect() # Acquire connection for the current task.
# ... queries ...
await db.aclose() # Release connection back to pool.
Each asyncio task gets its own connection from the pool. Connections are not shared between tasks. Each async task will have its own connection and transaction state - this prevents bugs that may occur when connections are shared and transactions end up interleaved across several running tasks.
To shut down completely (e.g. during application teardown):
await db.close_pool()
MySQL and Postgresql¶
MySQL and Postgresql use the driver’s native connection pool.
Pool configuration options include:
pool_size: Maximum number of connectionspool_min_size: Minimum pool sizeacquire_timeout: Timeout when acquiring a connection
db = AsyncPostgresqlDatabase(
'peewee_test',
host='localhost',
user='postgres',
pool_size=10,
pool_min_size=1,
acquire_timeout=10)
SQLite¶
Peewee provides a simple connection-pooling implementation for SQLite connections.
Pool configuration options include:
pool_size: Maximum number of connectionsacquire_timeout: Timeout when acquiring a connection
SQLite operates on local disk storage, so queries typically execute extremely
quickly. The cost of dispatching to a background thread and wrapping in
coroutines increases the latency per query. For every query executed, a closure
must be created, a future allocated, a queue written-to, a loop
call_soon_threadsafe() issued, and two context switches made. This is the
case with aiosqlite.
Additionally, SQLite only allows one writer at a time, so while using an async wrapper may keep things responsive while waiting to obtain the write lock, writes will not occur “faster”, the bottleneck has merely been moved. Conversely, if you don’t have that much load, the async wrapper adds complexity and overhead for no measurable benefit.
To use SQLite in an async environment anyways, it is strongly recommended to use WAL-mode at a minimum, which allows multiple readers to co-exist with a single writer:
db = AsyncSqliteDatabase('app.db', pragmas={'journal_mode': 'wal'})
Note
In-memory databases (':memory:') always use a single connection
regardless of pool_size - pooled in-memory connections would each be
a separate, empty database.
API Reference¶
- class AsyncDatabaseMixin(database, pool_size=10, pool_min_size=1, acquire_timeout=10, **kwargs)¶
- Parameters
database (str) – Database name or filename for SQLite.
pool_size (int) – Maximum size of the connection pool.
pool_min_size (int) – Minimum size of the connection pool (ignored for SQLite, which always creates
pool_sizeconnections).acquire_timeout (float) – Time (in seconds) to wait for a free connection when acquiring from the pool.
kwargs – Arbitrary keyword arguments passed to the underlying database driver when creating connections (e.g.,
user,password,host).
Mixin class providing asyncio execution support. Use a driver-specific subclass in application code:
Each asyncio task maintains its own connection state and transaction stack. Connections are acquired and released back to the pool when the task completes or the database context exits.
- async run(fn, *args, **kwargs)¶
- Parameters
fn – A synchronous callable.
- Returns
The return value of
fn(*args, **kwargs).
Execute a synchronous callable inside a greenlet and return the result. This is the primary entry point for executing Peewee ORM code in an async context.
When database I/O or blocking would occur, control is yielded to the event-loop automatically.
Example:
db = AsyncSqliteDatabase(':memory:') class User(db.Model): username = TextField() def setup_app(): # Ensure table exists and admin user is present at startup. with db: db.create_tables([User]) # Create admin user if does not exist. try: with db.atomic(): User.create(username='admin') except IntegrityError: pass async def main(): await db.run(setup_app) # We can pass arguments to the synchronous callable and get # return values as well. admin_user = await db.run(User.get, User.username == 'admin')
- async aconnect()¶
- Returns
A wrapped async connection.
Acquire a connection from the pool for the current task. Typically the connection is not used directly, since the connection will be bound to the task using a task-local.
Example:
# Acquire a connection from the pool which will be used for the # current asyncio task. await db.aconnect() # Run some queries. users = await db.list(User.select().order_by(User.username)) for user in users: print(user.username) # Close connection, which releases it back to the pool. await db.aclose()
Typically applications should prefer to use the async context-manager for connection management, e.g.:
db = AsyncSqliteDatabase(':memory:') async with db: # Connection is obtained from the pool and used for this task. await db.acreate_tables([User, Tweet]) # Context block exits, connection is released back to pool.
- async aclose()¶
Release the current task’s connection back to the pool. Like synchronous
close(), raisesOperationalErrorif called while a transaction is open. Connections reclaimed from tasks that exited uncleanly have any open transaction rolled back, so the next acquirer always sees a clean connection.
- async close_pool()¶
Close the underlying connection pool and release all active connections.
This method should be called during application shutdown.
Connections orphaned by tasks that exited without closing them are reclaimed as well, with any open transaction rolled back.
- async __aenter__()¶
- async __aexit__(exc_type, exc, tb)¶
Async database context, acquiring a connection for the current task for the duration of the wrapped block.
db = AsyncSqliteDatabase(':memory:') async with db: # Connection is obtained from the pool and used for this task. await db.acreate_tables([User, Tweet]) # Context block exits, connection is released back to pool.
- async aexecute(query)¶
- Parameters
query (Query) – a Select, Insert, Update or Delete query.
- Returns
the normal return-value for the query type.
Execute any Peewee query object and return its result.
Example:
insert = User.insert(username='Huey') pk = await db.aexecute(insert) update = (Tweet .update(is_published=True) .where(Tweet.timestamp <= datetime.now())) nrows = await db.aexecute(update) spammers = (User .delete() .where(User.username.contains('billing')) .returning(User.username)) for u in await db.aexecute(spammers): print(f'Deleted: {u.username}')
The query is bound to this database before executing. The convenience methods (
get(),list(),scalar(),count()andexists()) execute the query against whatever database it is already bound to.
- async get(query)¶
- Parameters
query (Query) – a Select query.
Execute a SELECT query and return a single model instance. Raises
DoesNotExistif no row matches.Example:
huey = await db.get(User.select().where(User.username == 'Huey')) # Fetch a model and a relation in single query. query = Tweet.select(Tweet, User).join(User).where(Tweet.id == 123) tweet = await db.get(query) print(tweet.user.username, '->', tweet.content)
- async list(query)¶
- Parameters
query (Query) – a Select query, or an Insert, Update or Delete query that utilizes RETURNING.
Execute a SELECT (or INSERT/UPDATE/DELETE with RETURNING) and return a list of results.
Example:
query = User.select().order_by(User.username) for user in await db.list(query): print(user.username)
- async iterate(query, buffer_size=None)¶
- Parameters
query (Query) – a Select query to stream results from using an async generator.
buffer_size (int) – Number of rows fetched per round-trip (default 100).
iterate()method uses server-side cursors (MySQL and Postgres) to efficiently stream large result-sets.Example:
query = User.select().order_by(User.username) async for user in db.iterate(query): print(user.username)
Note
While streaming, the iterator holds the task’s connection. Another query on the same connection - including a second
iterate()- waits briefly for an abandoned iterator to finalize (e.g. after breaking out of the loop early), then raisesInterfaceError. The grace period is the connection wrapper’sstreaming_timeoutattribute (default 5 seconds). To release the connection promptly after a partial iteration,awaitthe generator’saclose()method.
- async scalar(query)¶
- Parameters
query (Query) – a Select query.
Execute a SELECT and return the first column of the first row.
Example:
max_id = await db.scalar(User.select(fn.MAX(User.id)))
- async count(query)¶
- Parameters
query (Query) – a Select query.
Wrap the query in a SELECT COUNT(…) and return the count of rows.
Example:
tweets = await db.count(Tweet.select().where(Tweet.is_published))
- async exists(query)¶
- Parameters
query (Query) – a Select query.
Return boolean whether the query contains any results.
- async aprefetch(query, *subqueries)¶
- Parameters
query (Query) – Query to use as starting-point.
subqueries – One or more models or
ModelSelectqueries to eagerly fetch.
- Returns
a list of models with selected relations prefetched.
Eagerly fetch related objects, allowing efficient querying of multiple tables when a 1-to-many relationship exists.
users = User.select().order_by(User.username) tweets = Tweet.select().order_by(Tweet.timestamp) for user in await db.aprefetch(users, tweets): print(user.username) for tweet in user.tweets: print(' ', tweet.content)
- atomic()¶
Return an async-aware atomic context manager. Supports both
async withandwith.Example of async usage:
async def transfer_funds(src, dest, amount): async with db.atomic() as txn: await db.aexecute( Account .update(balance=Account.balance - amount) .where(Account.id == src.id)) await db.aexecute( Account .update(balance=Account.balance + amount) .where(Account.id == dest.id)) async def main(): await transfer_funds(user1, user2, 100.)
Example of sync usage:
def transfer_funds(src, dest, amount): with db.atomic() as txn: (Account .update(balance=Account.balance - amount) .where(Account.id == src.id) .execute()) (Account .update(balance=Account.balance + amount) .where(Account.id == dest.id) .execute()) async def main(): await db.run(transfer_funds, user1, user2, 100.)
- transaction()¶
- savepoint()¶
Like
atomic(), async-aware wrappers of peewee’s transaction and savepoint context-managers, supporting bothasync withandwith. Transaction objects additionally provideacommit()andarollback()coroutines, mirroring peewee’scommit()androllback().Note
On Postgresql,
atomic(),transaction()andsavepoint()all return a transaction manager built directly on asyncpg: arguments are forwarded to asyncpg’sConnection.transaction()(e.g.isolation=,readonly=), and nested blocks are implemented as savepoints by asyncpg’s transaction nesting.
- async acreate_tables(models, **options)¶
- Parameters
models (list) – A list of
Modelclasses.options – Options to specify when calling
Model.create_table().
Create tables, indexes and associated constraints for the given list of models.
Dependencies are resolved so that tables are created in the appropriate order.
Example:
class User(db.Model): ... class Tweet(db.Model): ... async def setup_hook(): async with db: await db.acreate_tables([User, Tweet])
- async adrop_tables(models, **options)¶
- Parameters
models (list) – A list of
Modelclasses.kwargs – Options to specify when calling
Model.drop_table().
Drop tables, indexes and constraints for the given list of models.
- async aexecute_sql(sql, params=None)¶
- Parameters
sql (str) – SQL query to execute.
params (tuple) – Optional query parameters.
- Returns
A
CursorAdapterinstance.
Execute SQL asynchronously. Returns a cursor-like object whose rows are already fetched (call
.fetchall()synchronously). For result streaming, seeiterate().
- class AsyncSqliteDatabase(database, **kwargs)¶
Async SQLite database implementation.
Uses
aiosqlitewith a simple pool ofpool_sizeconnections (pool_min_sizeis ignored).Inherits from
AsyncDatabaseMixinandSqliteDatabase.
- class AsyncPostgresqlDatabase(database, **kwargs)¶
Async Postgresql database implementation.
Uses
asyncpgand the driver’s native connection pool. Affected-row counts for UPDATE and DELETE are derived from the command status reported by the server.A connection URL may be given as the
databaseargument ('postgresql://...'), andisolation_levelaccepts a level name (e.g.'SERIALIZABLE') which is applied to each pooled connection.Inherits from
AsyncDatabaseMixinandPostgresqlDatabase.Note
Model.bulk_update()is not supported with asyncpg: the CASE expression’s untyped parameters are resolved astextby the server, which fails for non-text columns.
- class AsyncMySQLDatabase(database, **kwargs)¶
Async MySQL / MariaDB database implementation.
Uses
aiomysqland the driver’s native connection pool. The server version - used, e.g., to distinguish MySQL from MariaDB when generatingJSONFieldSQL - is detected when the first connection is acquired.Inherits from
AsyncDatabaseMixinandMySQLDatabase.
- class MissingGreenletBridge(RuntimeError)¶
Raised when Peewee attempts to execute a query outside a greenlet context. This indicates that a query was triggered outside of
db.run()or an async helper call.