Asyncio Support

Peewee provides a thin asyncio compatibility layer that allows existing APIs to be used safely with the asyncio event loop. Unlike other peewee async projects, this implementation uses greenlet to execute Peewee’s internally synchronous code while allowing async database operations to run on the event loop.

When database I/O is required, execution is suspended and control is returned to the event loop. Once the awaitable completes, execution resumes exactly where it left off.

The Peewee asyncio implementation can be found in playhouse.pwasyncio:

from playhouse.pwasyncio import AsyncPostgresqlDatabase
from playhouse.pwasyncio import AsyncMySQLDatabase
from playhouse.pwasyncio import AsyncSqliteDatabase

The run() method is the primary entry point for async execution. It accepts a synchronous callable and arbitrary arguments. When the underlying database driver would block, control is yielded back to the asyncio event-loop.

await db.run(User.create, name='Alice')

If you prefer a more async-native approach, a number of helper methods are available on the async Database classes:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = TextField()

async def demo():
    async with db:
        # Asynchronously create table(s).
        await db.acreate_tables([User])

        # Create a new user.
        user = await db.run(User.create, name='Charlie')

        # Retrieve new user from the database.
        user_db = await db.get(User.select().where(User.name == 'Charlie'))
        assert user.name == user_db.name == 'Charlie'

        # Atomicity with async context managers.
        async with db.atomic():
            # Construct a normal Peewee INSERT query.
            iq = (User
                  .insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
                  .returning(User))

            # Execute the query asynchronously, retrieving results.
            users = await db.aexecute(iq)
            print('Added users: %s' % list(users))

        # Retrieve list of users from database.
        for user in await db.list(User.select().order_by(User.name)):
            print(user.name)

    # Close the pool - the connection was released, but it still remains inside
    # the pool, so this ensures we are ready to shutdown completely.
    await db.close_pool()

asyncio.run(demo())

Here is the same example as above demonstrating how run() can be used to wrap synchronous ORM operations to be async:

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = TextField()

async def demo():
    async with db:
        # Asynchronously create table(s).
        await db.run(db.create_tables, [User])

        # Create a new user.
        user = await db.run(User.create, name='Charlie')

        # Retrieve new user from the database using a callable.
        def get_user():
            return User.select().where(User.name == 'Charlie').get()

        user_db = await db.run(get_user)
        assert user.name == user_db.name == 'Charlie'

        # Atomicity with a normal context manager.
        def bulk_insert():
            with db.atomic():
                iq = (User
                      .insert_many([{'name': 'Alice'}, {'name': 'Bob'}])
                      .returning(User))
                users = iq.execute()
                print('Added users: %s' % list(users))

        await db.run(bulk_insert)

        # Retrieve list of users from database.
        users = await db.run(list, User.select().order_by(User.name))
        for user in users:
            print(user.name)

    # Close the pool - the connection was released, but it still remains inside
    # the pool, so this ensures we are ready to shutdown completely.
    await db.close_pool()

asyncio.run(demo())

When running Peewee ORM code, you can choose between the two execution patterns depending on how explicit you want to be. See Helper Methods for details on the available async-friendly helper methods.

Supported Backends

Database

Driver

Database Class

SQLite

aiosqlite

AsyncSqliteDatabase

MySQL / MariaDB

aiomysql

AsyncMySQLDatabase

PostgreSQL

asyncpg

AsyncPostgresqlDatabase

Installation

This module requires Python 3.8 or newer and depends on Peewee and greenlet, along with whatever async-compatible driver you intend to use.

pip install peewee greenlet
pip install asyncpg  # Postgresql.
pip install aiomysql  # MySQL.
pip install aiosqlite  # Sqlite.

Overview

Async database classes mirror Peewee’s standard database classes and can generally be used as drop-in replacements.

from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db', pragmas={'journal_mode': 'wal'})

Models are defined as expected:

class User(db.Model):
    name = CharField()

Executing Queries

Peewee queries must be executed using the database run() method.

from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = CharField()

async def main():
    await db.acreate_tables([User])

    # Use the db.run() helper with a synchronous callable.
    await db.run(User.create, name='Alice')

    # OR use an awaitable helper method.
    user = await db.get(User.select().where(User.name == 'Alice'))

    def add_users():
        with db.atomic():
            User.create(name='Bob')
            User.create(name='Charlie')

        with db.atomic():
            alice = User.get(User.name == 'Alice')
            alice.name = 'Alyce'  # She spells it fancy now.
            alice.save()

        return list(User.select().order_by(User.name))

    users = await db.run(add_users)
    print([u.name for u in users])

    await db.adrop_tables([User])
    await db.close_pool()

asyncio.run(main())

The run() method ensures synchronous Peewee code is executed safely inside the event loop. Any arbitrary code can be wrapped in a function and sent to the run() method.

Helper Methods

The AsyncDatabaseMixin provides a number of async/await-friendly helpers for common operations.

The most general-purpose helper is the AsyncDatabaseMixin.run() method, which accepts a callable and arbitrary arguments, and runs it in an asynchronous context.

Database connection can be opened and closed asynchronously using the following helpers:

  • aconnect() - acquire a connection from the pool for the current task.

  • aclose() - release the connection back to the pool.

  • close_pool() - close the connection pool.

Several common query patterns are exposed as async helpers:

  • atomic() - can be used as an async context manager for created arbitrarily nested transactions.

  • get() - get a single model instance or result.

  • list() - get the results of a query (0…many).

  • scalar() - get a single, scalar value from a query.

  • aexecute() - execute a Query object.

  • aexecute_sql() - execute a SQL query.

  • acreate_tables() - create one or more tables.

  • adrop_tables() - drop one or more tables.

import asyncio
from peewee import *
from playhouse.pwasyncio import AsyncSqliteDatabase

db = AsyncSqliteDatabase('example.db')

class User(db.Model):
    name = CharField()

async def main():
    async with db:
        await db.acreate_tables([User])

        insert = (User
                  .insert_many([(f'user-{i}',) for i in range(10)])
                  .returning(User))
        for new_user in await db.aexecute(insert):
            print(f'Added user {new_user.name}')

        user = await db.get(User.select().where(User.id == 1))

        users = await db.list(User.select())
        print(f'Found {len(users)} users via select')

        async with db.atomic():
            user = await db.run(User.create, name='Charlie')
            print(f'Added new user with id={user.id}')

        count = await db.scalar(User.select(fn.COUNT(User.id)))
        print(f'COUNT returns {count} users')

        await db.adrop_tables([User])

    # Close all connections and exit cleanly.
    await db.close_pool()

asyncio.run(main())

Connections and Pooling

Like non-async Peewee, which uses a connection-per-thread, each asyncio task maintains its own connection state. This avoids sharing connections across concurrent tasks. Internally, we try to use the driver-provided pools where possible.

Connections can be acquired and released from the pool using the following helpers:

Alternatively, you can use the database class as a context manager:

db = AsyncSqliteDatabase('example.db')

await db.aconnect()
# Do some database work.
await db.aclose()

# Or use the context manager:
async with db:
    # Do some database work.
    ...

SQLite

SQLite uses a single shared connection, as the underlying database does not support concurrent writers.

Generally SQLite is a poor fit for asynchronous workflows where writes may be coming in at any time, and transactions may be interleaved across multiple writers. Furthermore, SQLite does not do any network I/O.

The SQLite implementation is provided mostly for testing and local development.

MySQL and PostgreSQL

MySQL and PostgreSQL use the driver’s native connection pool.

Pool configuration options include:

  • pool_size – Maximum number of connections

  • pool_min_size – Minimum pool size

  • acquire_timeout – Timeout when acquiring a connection

db = AsyncPostgresqlDatabase(
    'peewee_test',
    host='localhost',
    user='postgres',
    pool_size=10,
    pool_min_size=1,
    acquire_timeout=10)

Transactions

Transactions and savepoints are managed using async context managers.

async with db.atomic():
    await db.run(User.create, name='Alice')

    async with db.atomic():
        await db.run(User.create, name='Bob')

Nested atomic blocks behave the same as in synchronous Peewee code.

Implementation Notes

Synchronous ORM code runs inside a greenlet, and async I/O is bridged explicitly by Peewee using two helpers, greenlet_spawn() and await_(). The greenlet_spawn() helper runs synchronous code, but can be suspended and resumed in order to yield to the asyncio event loop. Yielding is done by the await_() helper, which suspends the greenlet and passes control to the asyncio coroutine.

Peewee wraps all this up in a general-purpose AsyncDatabaseMixin.run() method, which is the entrypoint for pretty much all async operations:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    def work():
        print(db.execute_sql('select 1').fetchall())
    await db.run(work)

asyncio.run(demo())  # prints [(1,)]

The basic flow goes something like this:

  1. The above code eventually hits the db.run() method. This method calls the greenlet_spawn() function, creating a resumable coroutine wrapping our synchronous code.

  2. The greenlet begins executing the synchronous Peewee code.

  3. We call db.execute_sql('select 1')

  4. The async database implementation calls our special await_() helper, which switches control back to the event loop.

  5. The event-loop awaits the coroutine, e.g. await conn.execute(...), awaiting the results from the cursor before handing them back.

  6. The result cursor is sent back to the greenlet, and the greenlet resumes.

  7. db.execute_sql() returns and the rest of the code continues normally.

  8. We call fetchall() on the result cursor, which returns all the rows loaded during (5).

If we try to run execute_sql() outside of the greenlet helper, Peewee will raise a MissingGreenletBridge exception:

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    print(db.execute_sql('select 1').fetchall())

# MissingGreenletBridge: Attempted query select 1 (None) outside greenlet runner.
asyncio.run(demo())

Peewee provides a number of async-ready helper methods for common operations, so the run() helper can be avoided:

from playhouse.pwasyncio import *

async def demo():
    db = AsyncSqliteDatabase(':memory:')
    curs = await db.aexecute_sql('select 1')
    print(curs.fetchall())

asyncio.run(demo())  # prints [(1,)]

Note

Obtaining the results from the cursor does not happen asynchronously (e.g. we do not call print(await curs.fetchall())). Internally Peewee does await fetching the results from the cursor, but the rows are all loaded before the cursor is returned to the caller. This ensures consistency with existing behavior, though in future versions we may add support for streaming cursor results (via Postgres server-side cursors).

Sharp Corners

There are limitations to what can be achieved with the approach described above. The main one I foresee causing problems is lazy foreign-key resolution. Consider this example:

tweet = await db.get(Tweet.select())
print(tweet.user.name)  # Fails.
# MissingGreenletBridge: Attempted query SELECT ...outside greenlet runner.

This fails because the relationship tweet.user was not explicitly fetched, so Peewee attempts to issue a SELECT query to get the related user. This fails because we are not operating inside the greenlet-bridged environment.

One solution is to resolve foreign keys inside run():

print(await db.run(lambda: tweet.user.name))

Even better is to select the related object explicitly:

query = Tweet.select(Tweet, User).join(User)
tweet = await db.get(query)
print(tweet.user.name)  # OK, no extra SELECT required.

In a similar way, iterating the related objects requires a query:

for tweet in user.tweet_set:
    print(tweet.message)
# MissingGreenletBridge: Attempted query SELECT ... outside greenlet runner.

Like above, there are a few ways you can accomplish this:

# Use the db.run() helper:
tweets = await db.run(list, user.tweet_set)
for tweet in tweets:
    print(tweet.message)

# Use the db.list() helper:
for tweet in await db.list(user.tweet_set):
    print(tweet.message)

# Use prefetch (not a great fit, but just to demonstrate):
user_query = User.select().where(User.id == user.id)
tweet_query = Tweet.select()
user, = await db.run(prefetch, user_query, tweet_query)
for tweet in user.tweet_set:
    print(tweet.message)

Overall these are the main issues I see arising, but as things come up I may expand this section or work to find other solutions to the problems.

API

class AsyncDatabaseMixin(database, pool_size=10, pool_min_size=1, acquire_timeout=10, validate_conn_timeout=2, **kwargs)
Parameters
  • database (str) – Database name or filename for SQLite.

  • pool_size (int) – Maximum size of the driver-managed connection pool (no-op for SQLite).

  • pool_min_size (int) – Minimum size of the driver-managed connection pool (no-op for SQLite).

  • acquire_timeout (float) – Time (in seconds) to wait for a free connection when acquiring from the pool.

  • kwargs – Arbitrary keyword arguments passed to the underlying database driver when creating connections (e.g., user, password, host).

This mixin provides asyncio execution support for Peewee database classes. It is not intended to be used directly, but instead forms the base for the concrete AsyncDatabase implementations.

Each asyncio task maintains its own connection state. Connections are acquired and released back to the pool when the task completes or the database context exits.

async run(fn, *args, **kwargs)
Parameters

fn – A synchronous callable.

Returns

The return value of fn.

Execute a synchronous callable inside a greenlet and return the result asynchronously.

This method is the primary entry point for executing Peewee ORM operations in an asyncio context.

async aconnect()
Returns

A wrapped async connection.

Establish a connection to the database for the currently-running asyncio task, if one is not already open.

async aclose()

Close and release the connection associated with the current asyncio task.

async close_pool()

Close the underlying connection pool and release all active connections.

This method should be called during application shutdown.

async __aenter__()

Enter an async database context, acquiring a connection.

async __aexit__(exc_type, exc, tb)

Exit the async database context, releasing the connection.

async aexecute(query)
Parameters

query (Query) – a Select, Insert, Update or Delete query.

Returns

the normal return-value for the query type.

Execute a query asynchronously.

async get(query)
Parameters

query (Query) – a Select query.

Execute a query and return a single model instance.

async list(query)
Parameters

query (Query) – a Select query, or an Insert, Update or Delete query that utilizes RETURNING.

Execute a query and return a list of results.

async scalar(query)
Parameters

query (Query) – a Select query.

Execute a query and return a scalar value.

atomic()
Returns

An async_atomic instance.

Return an asyncio-aware atomic transaction context manager. Supports use as a synchronous or async context manager.

async acreate_tables(models, **options)

Asynchronously create database tables.

async adrop_tables(models, **options)

Asynchronously drop database tables.

async aexecute_sql(sql, params=None)
Parameters
  • sql (str) – SQL query to execute.

  • params (tuple) – Optional query parameters.

Returns

A CursorAdapter instance.

Execute a SQL query asynchronously using the underlying async driver.

execute_sql(sql, params=None)

Synchronous wrapper around aexecute_sql().

This method may only be called from code executing inside run().

class AsyncSqliteDatabase(database, **kwargs)

Async SQLite database implementation.

Uses aiosqlite and maintains a single shared connection. Pool-related configuration options are ignored.

Inherits from AsyncDatabaseMixin and peewee.SqliteDatabase.

class AsyncMySQLDatabase(database, **kwargs)

Async MySQL / MariaDB database implementation.

Uses aiomysql and the driver’s native connection pool.

Inherits from AsyncDatabaseMixin and peewee.MySQLDatabase.

class AsyncPostgresqlDatabase(database, **kwargs)

Async PostgreSQL database implementation.

Uses asyncpg and the driver’s native connection pool.

Inherits from AsyncDatabaseMixin and peewee.PostgresqlDatabase.

class MissingGreenletBridge(RuntimeError)

Exception that is raised when Peewee attempts to run a blocking operation like execute_sql() outside a greenlet-spawn context. Generally indicates that you have attempted to execute a query outside of db.run() or without using one of the async helper methods.