Make pandas/io/sql.py work with sqlalchemy 2.0 by cdcadman · Pull Request #48576 · pandas-dev/pandas (original) (raw)
I'm planning to make some changes to this PR. Firstly, I noticed that pandas.io.sql.execute is documented, right above this line: https://pandas.pydata.org/docs/user_guide/io.html?highlight=sql%20execute#engine-connection-examples . As it stands, my PR would make this return a context manager instead of a Results Iterable, and I don't think I need to make this change, so I will change it back.
I plan to make SQLDatabase
accept only a SQLAlchemy Connection
and not an Engine
. I would change pandasSQL_builder
into a generating function, decorated by contextlib.contextmanager
, so that it can dispose of the Engine
that is created if the connectable is a string. A new argument, need_txn
will be set to True
by to_sql
, and otherwise be False
. An advantage of this approach is that I can begin a transaction if the connectable is a Connection
which is not already in a transaction.
@contextmanager
def pandasSQL_builder(
con,
schema: str | None = None,
need_txn: bool = False,
) -> Iterator[SQLDatabase] | Iterator[SQLiteDatabase]:
"""
Convenience function to return the correct PandasSQL subclass based on the
provided parameters. Also creates a sqlalchemy connection and transaction
if necessary.
"""
import sqlite3
import warnings
if isinstance(con, sqlite3.Connection) or con is None:
yield SQLiteDatabase(con)
else:
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
if sqlalchemy is not None and isinstance(con, (str, sqlalchemy.engine.Connectable)):
with _sqlalchemy_con(con, need_txn) as con:
yield SQLDatabase(con, schema=schema)
elif isinstance(con, str) and sqlalchemy is None:
raise ImportError("Using URI string without sqlalchemy installed.")
else:
warnings.warn(
"pandas only supports SQLAlchemy connectable (engine/connection) or "
"database string URI or sqlite3 DBAPI2 connection. "
"Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.",
UserWarning,
stacklevel=find_stack_level(),
)
yield SQLiteDatabase(con)
@contextmanager
def _sqlalchemy_con(connectable, need_txn: bool):
"""Create a sqlalchemy connection and a transaction if necessary."""
import sqlalchemy
if isinstance(connectable, str):
engine = sqlalchemy.create_engine(connectable)
try:
with engine.connect() as con:
if need_txn:
with con.begin():
yield con
else:
yield con
finally:
engine.dispose()
elif isinstance(connectable, sqlalchemy.engine.Engine):
with connectable.connect() as con:
if need_txn:
with con.begin():
yield con
else:
yield con
else:
if need_txn and not connectable.in_transaction():
with connectable.begin():
yield connectable
else:
yield connectable
In test_sql.py
, I will take out the test classes which pass Engines to SQLDatabase
, which will reduce the number of tests.
As I was looking over the tests, I noticed this interesting behavior related to transactions. I like having the ability to rollback a DataFrame.to_sql
call to help maintain data integrity, but if a sqlite3.Connection
is used, pandas commits the transaction. If a sqlalchemy.engine.Connection
is used, then pandas does not commit the transaction. Maybe this is fine, because someone who wants the sqlalchemy behavior with a sqlite database can just make a sqlalchemy connection to the database. Here is an example script:
import sqlite3
from pandas import DataFrame
from sqlalchemy import create_engine
with sqlite3.connect(":memory:") as con:
con.execute("create table test (A integer, B integer)")
row_count = con.execute("insert into test values (2, 4), (5, 10)").rowcount
if row_count > 1:
con.rollback()
print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 0
with sqlite3.connect(":memory:") as con:
con.execute("create table test (A integer, B integer)")
row_count = DataFrame({'A': [2, 5], 'B': [4, 10]}).to_sql('test', con, if_exists='append', index=False)
if row_count > 1:
con.rollback() # does nothing, because pandas already committed the transaction.
print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 2
with create_engine("sqlite:///:memory:").connect() as con:
with con.begin():
con.exec_driver_sql("create table test (A integer, B integer)")
try:
with con.begin():
row_count = DataFrame({'A': [2, 5], 'B': [4, 10]}).to_sql('test', con, if_exists='append', index=False)
assert row_count < 2
except AssertionError:
pass
print(con.execute("select count(*) from test").fetchall()[0][0]) # prints 0