Make pandas/io/sql.py work with sqlalchemy 2.0 by cdcadman · Pull Request #48576 · pandas-dev/pandas (original) (raw)

I'm planning to make some changes to this PR. Firstly, I noticed that pandas.io.sql.execute is documented, right above this line: https://pandas.pydata.org/docs/user_guide/io.html?highlight=sql%20execute#engine-connection-examples . As it stands, my PR would make this return a context manager instead of a Results Iterable, and I don't think I need to make this change, so I will change it back.

I plan to make SQLDatabase accept only a SQLAlchemy Connection and not an Engine. I would change pandasSQL_builder into a generating function, decorated by contextlib.contextmanager, so that it can dispose of the Engine that is created if the connectable is a string. A new argument, need_txn will be set to True by to_sql, and otherwise be False. An advantage of this approach is that I can begin a transaction if the connectable is a Connection which is not already in a transaction.

@contextmanager
def pandasSQL_builder(
    con,
    schema: str | None = None,
    need_txn: bool = False,
) -> Iterator[SQLDatabase] | Iterator[SQLiteDatabase]:
    """
    Convenience function to return the correct PandasSQL subclass based on the
    provided parameters.  Also creates a sqlalchemy connection and transaction
    if necessary.
    """
    import sqlite3
    import warnings

    if isinstance(con, sqlite3.Connection) or con is None:
        yield SQLiteDatabase(con)
    else:
        sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")

        if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
            with _sqlalchemy_con(con, need_txn) as con:
                yield SQLDatabase(con, schema=schema)
        elif isinstance(con, str) and sqlalchemy is None:
            raise ImportError("Using URI string without sqlalchemy installed.")
        else:

            warnings.warn(
                "pandas only supports SQLAlchemy connectable (engine/connection) or "
                "database string URI or sqlite3 DBAPI2 connection. "
                "Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.",
                UserWarning,
                stacklevel=find_stack_level(),
            )
            yield SQLiteDatabase(con)


@contextmanager
def _sqlalchemy_con(connectable, need_txn: bool):
    """Create a sqlalchemy connection and a transaction if necessary."""
    import sqlalchemy

    if isinstance(connectable, str):
        engine = sqlalchemy.create_engine(connectable)
        try:
            with engine.connect() as con:
                if need_txn:
                    with con.begin():
                        yield con
                else:
                    yield con
        finally:
            engine.dispose()
    elif isinstance(connectable, sqlalchemy.engine.Engine):
        with connectable.connect() as con:
            if need_txn:
                with con.begin():
                    yield con
            else:
                yield con
    else:
        if need_txn and not connectable.in_transaction():
            with connectable.begin():
                yield connectable
        else:
            yield connectable

In test_sql.py, I will take out the test classes which pass Engines to SQLDatabase, which will reduce the number of tests.

As I was looking over the tests, I noticed this interesting behavior related to transactions. I like having the ability to rollback a DataFrame.to_sql call to help maintain data integrity, but if a sqlite3.Connection is used, pandas commits the transaction. If a sqlalchemy.engine.Connection is used, then pandas does not commit the transaction. Maybe this is fine, because someone who wants the sqlalchemy behavior with a sqlite database can just make a sqlalchemy connection to the database. Here is an example script:

import sqlite3
from pandas import DataFrame
from sqlalchemy import create_engine

with sqlite3.connect(":memory:") as con:
    con.execute("create table test (A integer, B integer)")
    row_count = con.execute("insert into test values (2, 4), (5, 10)").rowcount
    if row_count > 1:
        con.rollback()
    print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 0

with sqlite3.connect(":memory:") as con:
    con.execute("create table test (A integer, B integer)")
    row_count = DataFrame({'A': [2, 5], 'B': [4, 10]}).to_sql('test', con, if_exists='append', index=False)
    if row_count > 1:
        con.rollback() # does nothing, because pandas already committed the transaction.
    print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 2
    
with create_engine("sqlite:///:memory:").connect() as con:
    with con.begin():
        con.exec_driver_sql("create table test (A integer, B integer)")
    try:
        with con.begin():
            row_count = DataFrame({'A': [2, 5], 'B': [4, 10]}).to_sql('test', con, if_exists='append', index=False)
            assert row_count < 2
    except AssertionError:
        pass
    print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 0