(original) (raw)
changeset: 74709:93ac4b12a750 branch: 2.7 parent: 74705:94b69bae61eb user: Petri Lehtinen petri@digip.org date: Wed Feb 01 22:20:12 2012 +0200 files: Lib/sqlite3/test/dbapi.py Lib/sqlite3/test/factory.py Misc/NEWS Modules/_sqlite/cursor.c Modules/_sqlite/statement.c description: sqlite3: Handle strings with embedded zeros correctly Closes #13676. diff -r 94b69bae61eb -r 93ac4b12a750 Lib/sqlite3/test/dbapi.py --- a/Lib/sqlite3/test/dbapi.py Wed Feb 01 08:55:21 2012 -0800 +++ b/Lib/sqlite3/test/dbapi.py Wed Feb 01 22:20:12 2012 +0200 @@ -203,6 +203,13 @@ def CheckExecuteArgString(self): self.cu.execute("insert into test(name) values (?)", ("Hugo",)) + def CheckExecuteArgStringWithZeroByte(self): + self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) + + self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) + row = self.cu.fetchone() + self.assertEqual(row[0], "Hu\x00go") + def CheckExecuteWrongNoOfArgs1(self): # too many parameters try: diff -r 94b69bae61eb -r 93ac4b12a750 Lib/sqlite3/test/factory.py --- a/Lib/sqlite3/test/factory.py Wed Feb 01 08:55:21 2012 -0800 +++ b/Lib/sqlite3/test/factory.py Wed Feb 01 22:20:12 2012 +0200 @@ -189,13 +189,52 @@ def tearDown(self): self.con.close() +class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase): + def setUp(self): + self.con = sqlite.connect(":memory:") + self.con.execute("create table test (value text)") + self.con.execute("insert into test (value) values (?)", ("a\x00b",)) + + def CheckString(self): + # text_factory defaults to unicode + row = self.con.execute("select value from test").fetchone() + self.assertIs(type(row[0]), unicode) + self.assertEqual(row[0], "a\x00b") + + def CheckCustom(self): + # A custom factory should receive an str argument + self.con.text_factory = lambda x: x + row = self.con.execute("select value from test").fetchone() + self.assertIs(type(row[0]), str) + self.assertEqual(row[0], "a\x00b") + + def CheckOptimizedUnicodeAsString(self): + # ASCII -> str argument + self.con.text_factory = sqlite.OptimizedUnicode + row = self.con.execute("select value from test").fetchone() + self.assertIs(type(row[0]), str) + self.assertEqual(row[0], "a\x00b") + + def CheckOptimizedUnicodeAsUnicode(self): + # Non-ASCII -> unicode argument + self.con.text_factory = sqlite.OptimizedUnicode + self.con.execute("delete from test") + self.con.execute("insert into test (value) values (?)", (u'�\0�',)) + row = self.con.execute("select value from test").fetchone() + self.assertIs(type(row[0]), unicode) + self.assertEqual(row[0], u"�\x00�") + + def tearDown(self): + self.con.close() + def suite(): connection_suite = unittest.makeSuite(ConnectionFactoryTests, "Check") cursor_suite = unittest.makeSuite(CursorFactoryTests, "Check") row_suite_compat = unittest.makeSuite(RowFactoryTestsBackwardsCompat, "Check") row_suite = unittest.makeSuite(RowFactoryTests, "Check") text_suite = unittest.makeSuite(TextFactoryTests, "Check") - return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite)) + text_zero_bytes_suite = unittest.makeSuite(TextFactoryTestsWithEmbeddedZeroBytes, "Check") + return unittest.TestSuite((connection_suite, cursor_suite, row_suite_compat, row_suite, text_suite, text_zero_bytes_suite)) def test(): runner = unittest.TextTestRunner() diff -r 94b69bae61eb -r 93ac4b12a750 Misc/NEWS --- a/Misc/NEWS Wed Feb 01 08:55:21 2012 -0800 +++ b/Misc/NEWS Wed Feb 01 22:20:12 2012 +0200 @@ -90,6 +90,8 @@ Library ------- +- Issue #13676: Handle strings with embedded zeros correctly in sqlite3. + - Issue #13506: Add '' to path for IDLE Shell when started and restarted with Restart Shell. Original patches by Marco Scataglini and Roger Serwy. diff -r 94b69bae61eb -r 93ac4b12a750 Modules/_sqlite/cursor.c --- a/Modules/_sqlite/cursor.c Wed Feb 01 08:55:21 2012 -0800 +++ b/Modules/_sqlite/cursor.c Wed Feb 01 22:20:12 2012 +0200 @@ -268,16 +268,17 @@ } } -PyObject* pysqlite_unicode_from_string(const char* val_str, int optimize) +PyObject* pysqlite_unicode_from_string(const char* val_str, Py_ssize_t size, int optimize) { const char* check; + Py_ssize_t pos; int is_ascii = 0; if (optimize) { is_ascii = 1; check = val_str; - while (*check) { + for (pos = 0; pos < size; pos++) { if (*check & 0x80) { is_ascii = 0; break; @@ -288,9 +289,9 @@ } if (is_ascii) { - return PyString_FromString(val_str); + return PyString_FromStringAndSize(val_str, size); } else { - return PyUnicode_DecodeUTF8(val_str, strlen(val_str), NULL); + return PyUnicode_DecodeUTF8(val_str, size, NULL); } } @@ -375,10 +376,11 @@ converted = PyFloat_FromDouble(sqlite3_column_double(self->statement->st, i)); } else if (coltype == SQLITE_TEXT) { val_str = (const char*)sqlite3_column_text(self->statement->st, i); + nbytes = sqlite3_column_bytes(self->statement->st, i); if ((self->connection->text_factory == (PyObject*)&PyUnicode_Type) || (self->connection->text_factory == pysqlite_OptimizedUnicode)) { - converted = pysqlite_unicode_from_string(val_str, + converted = pysqlite_unicode_from_string(val_str, nbytes, self->connection->text_factory == pysqlite_OptimizedUnicode ? 1 : 0); if (!converted) { @@ -391,9 +393,9 @@ PyErr_SetString(pysqlite_OperationalError, buf); } } else if (self->connection->text_factory == (PyObject*)&PyString_Type) { - converted = PyString_FromString(val_str); + converted = PyString_FromStringAndSize(val_str, nbytes); } else { - converted = PyObject_CallFunction(self->connection->text_factory, "s", val_str); + converted = PyObject_CallFunction(self->connection->text_factory, "s#", val_str, nbytes); } } else { /* coltype == SQLITE_BLOB */ diff -r 94b69bae61eb -r 93ac4b12a750 Modules/_sqlite/statement.c --- a/Modules/_sqlite/statement.c Wed Feb 01 08:55:21 2012 -0800 +++ b/Modules/_sqlite/statement.c Wed Feb 01 22:20:12 2012 +0200 @@ -166,13 +166,13 @@ rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter)); break; case TYPE_STRING: - string = PyString_AS_STRING(parameter); - rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT); + PyString_AsStringAndSize(parameter, &string, &buflen); + rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT); break; case TYPE_UNICODE: stringval = PyUnicode_AsUTF8String(parameter); - string = PyString_AsString(stringval); - rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT); + PyString_AsStringAndSize(stringval, &string, &buflen); + rc = sqlite3_bind_text(self->st, pos, string, buflen, SQLITE_TRANSIENT); Py_DECREF(stringval); break; case TYPE_BUFFER: /petri@digip.org