aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRonan Lamy <ronan.lamy@gmail.com>2018-11-29 05:29:36 +0000
committerRonan Lamy <ronan.lamy@gmail.com>2018-11-29 05:29:36 +0000
commit67450df5dd12ac9631f5cab6c671c71c8dd4b6b5 (patch)
tree3092b0e036b645f30da6bd030ed07c34cf857ab7 /extra_tests
parenthg merge default (diff)
downloadpypy-67450df5dd12ac9631f5cab6c671c71c8dd4b6b5.tar.gz
pypy-67450df5dd12ac9631f5cab6c671c71c8dd4b6b5.tar.bz2
pypy-67450df5dd12ac9631f5cab6c671c71c8dd4b6b5.zip
Move test_sqlite3.py to extra_tests/
Diffstat (limited to 'extra_tests')
-rw-r--r--extra_tests/test_sqlite3.py325
1 files changed, 325 insertions, 0 deletions
diff --git a/extra_tests/test_sqlite3.py b/extra_tests/test_sqlite3.py
new file mode 100644
index 0000000000..ac1e4d078b
--- /dev/null
+++ b/extra_tests/test_sqlite3.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+"""Tests for _sqlite3.py"""
+
+from __future__ import absolute_import
+import pytest
+import sys
+
+_sqlite3 = pytest.importorskip('_sqlite3')
+
+pypy_only = pytest.mark.skipif('__pypy__' not in sys.builtin_module_names,
+ reason="PyPy-only test")
+
+
+@pytest.yield_fixture
+def con():
+ con = _sqlite3.connect(':memory:')
+ yield con
+ con.close()
+
+
+def test_list_ddl(con):
+ """From issue996. Mostly just looking for lack of exceptions."""
+ cursor = con.cursor()
+ cursor.execute('CREATE TABLE foo (bar INTEGER)')
+ result = list(cursor)
+ assert result == []
+ cursor.execute('INSERT INTO foo (bar) VALUES (42)')
+ result = list(cursor)
+ assert result == []
+ cursor.execute('SELECT * FROM foo')
+ result = list(cursor)
+ assert result == [(42,)]
+
+@pypy_only
+def test_connect_takes_same_positional_args_as_Connection(con):
+ from inspect import getargspec
+ clsargs = getargspec(_sqlite3.Connection.__init__).args[1:] # ignore self
+ conargs = getargspec(_sqlite3.connect).args
+ assert clsargs == conargs
+
+def test_total_changes_after_close(con):
+ con.close()
+ with pytest.raises(_sqlite3.ProgrammingError):
+ con.total_changes
+
+def test_connection_check_init():
+ class Connection(_sqlite3.Connection):
+ def __init__(self, name):
+ pass
+
+ con = Connection(":memory:")
+ with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
+ con.cursor()
+ assert '__init__' in excinfo.value.message
+
+
+def test_cursor_check_init(con):
+ class Cursor(_sqlite3.Cursor):
+ def __init__(self, name):
+ pass
+
+ cur = Cursor(con)
+ with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
+ cur.execute('select 1')
+ assert '__init__' in excinfo.value.message
+
+def test_connection_after_close(con):
+ with pytest.raises(TypeError):
+ con()
+ con.close()
+ # raises ProgrammingError because should check closed before check args
+ with pytest.raises(_sqlite3.ProgrammingError):
+ con()
+
+def test_cursor_iter(con):
+ cur = con.cursor()
+ with pytest.raises(StopIteration):
+ next(cur)
+
+ cur.execute('select 1')
+ next(cur)
+ with pytest.raises(StopIteration):
+ next(cur)
+
+ cur.execute('select 1')
+ con.commit()
+ next(cur)
+ with pytest.raises(StopIteration):
+ next(cur)
+
+ with pytest.raises(_sqlite3.ProgrammingError):
+ cur.executemany('select 1', [])
+ with pytest.raises(StopIteration):
+ next(cur)
+
+ cur.execute('select 1')
+ cur.execute('create table test(ing)')
+ with pytest.raises(StopIteration):
+ next(cur)
+
+ cur.execute('select 1')
+ cur.execute('insert into test values(1)')
+ con.commit()
+ with pytest.raises(StopIteration):
+ next(cur)
+
+def test_cursor_after_close(con):
+ cur = con.execute('select 1')
+ cur.close()
+ con.close()
+ with pytest.raises(_sqlite3.ProgrammingError):
+ cur.close()
+ # raises ProgrammingError because should check closed before check args
+ with pytest.raises(_sqlite3.ProgrammingError):
+ cur.execute(1,2,3,4,5)
+ with pytest.raises(_sqlite3.ProgrammingError):
+ cur.executemany(1,2,3,4,5)
+
+@pypy_only
+def test_connection_del(tmpdir):
+ """For issue1325."""
+ import os
+ import gc
+ resource = pytest.importorskip('resource')
+
+ limit = resource.getrlimit(resource.RLIMIT_NOFILE)
+ try:
+ fds = 0
+ while True:
+ fds += 1
+ resource.setrlimit(resource.RLIMIT_NOFILE, (fds, limit[1]))
+ try:
+ for p in os.pipe(): os.close(p)
+ except OSError:
+ assert fds < 100
+ else:
+ break
+
+ def open_many(cleanup):
+ con = []
+ for i in range(3):
+ con.append(_sqlite3.connect(str(tmpdir.join('test.db'))))
+ if cleanup:
+ con[i] = None
+ gc.collect(); gc.collect()
+
+ with pytest.raises(_sqlite3.OperationalError):
+ open_many(False)
+ sys.exc_clear()
+ gc.collect(); gc.collect()
+ open_many(True)
+ finally:
+ resource.setrlimit(resource.RLIMIT_NOFILE, limit)
+
+def test_on_conflict_rollback_executemany(con):
+ major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
+ if (int(major), int(minor), int(micro)) < (3, 2, 2):
+ pytest.skip("requires sqlite3 version >= 3.2.2")
+ con.execute("create table foo(x, unique(x) on conflict rollback)")
+ con.execute("insert into foo(x) values (1)")
+ try:
+ con.executemany("insert into foo(x) values (?)", [[1]])
+ except _sqlite3.DatabaseError:
+ pass
+ con.execute("insert into foo(x) values (2)")
+ try:
+ con.commit()
+ except _sqlite3.OperationalError:
+ pytest.fail("_sqlite3 knew nothing about the implicit ROLLBACK")
+
+def test_statement_arg_checking(con):
+ with pytest.raises(_sqlite3.Warning) as e:
+ con(123)
+ assert str(e.value) == 'SQL is of wrong type. Must be string or unicode.'
+ with pytest.raises(ValueError) as e:
+ con.execute(123)
+ assert str(e.value) == 'operation parameter must be str or unicode'
+ with pytest.raises(ValueError) as e:
+ con.executemany(123, 123)
+ assert str(e.value) == 'operation parameter must be str or unicode'
+ with pytest.raises(ValueError) as e:
+ con.executescript(123)
+ assert str(e.value) == 'script argument must be unicode or string.'
+
+def test_statement_param_checking(con):
+ con.execute('create table foo(x)')
+ con.execute('insert into foo(x) values (?)', [2])
+ con.execute('insert into foo(x) values (?)', (2,))
+ class seq(object):
+ def __len__(self):
+ return 1
+ def __getitem__(self, key):
+ return 2
+ con.execute('insert into foo(x) values (?)', seq())
+ del seq.__len__
+ with pytest.raises(_sqlite3.ProgrammingError):
+ con.execute('insert into foo(x) values (?)', seq())
+ with pytest.raises(_sqlite3.ProgrammingError):
+ con.execute('insert into foo(x) values (?)', {2:2})
+ with pytest.raises(ValueError) as e:
+ con.execute('insert into foo(x) values (?)', 2)
+ assert str(e.value) == 'parameters are of unsupported type'
+
+def test_explicit_begin(con):
+ con.execute('BEGIN')
+ con.execute('BEGIN ')
+ con.execute('BEGIN')
+ con.commit()
+ con.execute('BEGIN')
+ con.commit()
+
+def test_row_factory_use(con):
+ con.row_factory = 42
+ con.execute('select 1')
+
+def test_returning_blob_must_own_memory(con):
+ import gc
+ con.create_function("returnblob", 0, lambda: buffer("blob"))
+ cur = con.execute("select returnblob()")
+ val = cur.fetchone()[0]
+ for i in range(5):
+ gc.collect()
+ got = (val[0], val[1], val[2], val[3])
+ assert got == ('b', 'l', 'o', 'b')
+ # in theory 'val' should be a read-write buffer
+ # but it's not right now
+ if not hasattr(_sqlite3, '_ffi'):
+ val[1] = 'X'
+ got = (val[0], val[1], val[2], val[3])
+ assert got == ('b', 'X', 'o', 'b')
+
+def test_description_after_fetchall(con):
+ cur = con.cursor()
+ assert cur.description is None
+ cur.execute("select 42").fetchall()
+ assert cur.description is not None
+
+def test_executemany_lastrowid(con):
+ cur = con.cursor()
+ cur.execute("create table test(a)")
+ cur.executemany("insert into test values (?)", [[1], [2], [3]])
+ assert cur.lastrowid is None
+ # issue 2682
+ cur.execute('''insert
+ into test
+ values (?)
+ ''', (1, ))
+ assert cur.lastrowid is not None
+ cur.execute('''insert\t into test values (?) ''', (1, ))
+ assert cur.lastrowid is not None
+
+def test_authorizer_bad_value(con):
+ def authorizer_cb(action, arg1, arg2, dbname, source):
+ return 42
+ con.set_authorizer(authorizer_cb)
+ with pytest.raises(_sqlite3.OperationalError) as e:
+ con.execute('select 123')
+ major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
+ if (int(major), int(minor), int(micro)) >= (3, 6, 14):
+ assert str(e.value) == 'authorizer malfunction'
+ else:
+ assert str(e.value) == \
+ ("illegal return value (1) from the authorization function - "
+ "should be SQLITE_OK, SQLITE_IGNORE, or SQLITE_DENY")
+
+def test_issue1573(con):
+ cur = con.cursor()
+ cur.execute(u'SELECT 1 as méil')
+ assert cur.description[0][0] == u"méil".encode('utf-8')
+
+def test_adapter_exception(con):
+ def cast(obj):
+ raise ZeroDivisionError
+
+ _sqlite3.register_adapter(int, cast)
+ try:
+ cur = con.cursor()
+ cur.execute("select ?", (4,))
+ val = cur.fetchone()[0]
+ # Adapter error is ignored, and parameter is passed as is.
+ assert val == 4
+ assert type(val) is int
+ finally:
+ del _sqlite3.adapters[(int, _sqlite3.PrepareProtocol)]
+
+def test_null_character(con):
+ if not hasattr(_sqlite3, '_ffi') and sys.version_info < (2, 7, 9):
+ pytest.skip("_sqlite3 too old")
+ with pytest.raises(ValueError) as excinfo:
+ con("\0select 1")
+ assert str(excinfo.value) == "the query contains a null character"
+ with pytest.raises(ValueError) as excinfo:
+ con("select 1\0")
+ assert str(excinfo.value) == "the query contains a null character"
+ cur = con.cursor()
+ with pytest.raises(ValueError) as excinfo:
+ cur.execute("\0select 2")
+ assert str(excinfo.value) == "the query contains a null character"
+ with pytest.raises(ValueError) as excinfo:
+ cur.execute("select 2\0")
+ assert str(excinfo.value) == "the query contains a null character"
+
+def test_close_in_del_ordering():
+ import gc
+ class SQLiteBackend(object):
+ success = False
+ def __init__(self):
+ self.connection = _sqlite3.connect(":memory:")
+ def close(self):
+ self.connection.close()
+ def __del__(self):
+ self.close()
+ SQLiteBackend.success = True
+ def create_db_if_needed(self):
+ conn = self.connection
+ cursor = conn.cursor()
+ cursor.execute("""
+ create table if not exists nameoftable(value text)
+ """)
+ cursor.close()
+ conn.commit()
+ SQLiteBackend().create_db_if_needed()
+ gc.collect()
+ gc.collect()
+ assert SQLiteBackend.success