examples.asyncio.async_orm — SQLAlchemy 2.0 Documentation (original) (raw)

Source code for examples.asyncio.async_orm

"""Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession object for asynchronous ORM use.

"""

from future import annotations

import asyncio import datetime from typing import List from typing import Optional

from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.future import select from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship from sqlalchemy.orm import selectinload

class Base(AsyncAttrs, DeclarativeBase): pass

class A(Base): tablename = "a"

id: Mapped[int] = mapped_column(primary_key=True)
data: Mapped[Optional[str]]
create_date: Mapped[datetime.datetime] = mapped_column(
    server_default=func.now()
)
bs: Mapped[List[B]] = relationship()

class B(Base): tablename = "b" id: Mapped[int] = mapped_column(primary_key=True) a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) data: Mapped[Optional[str]]

async def async_main(): """Main program function."""

engine = create_async_engine(
    "postgresql+asyncpg://scott:tiger@localhost/test",
    echo=True,
)

async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.drop_all)
async with engine.begin() as conn:
    await conn.run_sync(Base.metadata.create_all)

# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = async_sessionmaker(engine, expire_on_commit=False)

async with async_session() as session:
    async with session.begin():
        session.add_all(
            [
                A(bs=[B(), B()], data="a1"),
                A(bs=[B()], data="a2"),
                A(bs=[B(), B()], data="a3"),
            ]
        )

    # for relationship loading, eager loading should be applied.
    stmt = select(A).options(selectinload(A.bs))

    # AsyncSession.execute() is used for 2.0 style ORM execution
    # (same as the synchronous API).
    result = await session.scalars(stmt)

    # result is a buffered Result object.
    for a1 in result:
        print(a1)
        print(f"created at: {a1.create_date}")
        for b1 in a1.bs:
            print(b1)

    # for streaming ORM results, AsyncSession.stream() may be used.
    result = await session.stream(stmt)

    # result is a streaming AsyncResult object.
    async for a1 in result.scalars():
        print(a1)
        for b1 in a1.bs:
            print(b1)

    result = await session.scalars(select(A).order_by(A.id))

    a1 = result.first()

    a1.data = "new data"

    await session.commit()

    # use the AsyncAttrs interface to accommodate for a lazy load
    for b1 in await a1.awaitable_attrs.bs:
        print(b1)

asyncio.run(async_main())

© Copyright 2007-2025, the SQLAlchemy authors and contributors.

flambé! the dragon and The Alchemist image designs created and generously donated by Rotem Yaari.

Created using Sphinx 7.2.6. Documentation last generated: Mon 05 May 2025 05:39:51 PM EDT