Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 40 additions & 214 deletions test/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@
from sqlalchemy.schema import DDL
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import db
from sqlalchemy.testing import eq_
from sqlalchemy.testing import provide_metadata, emits_warning
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.provision import temp_table_keyword_args
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy import bindparam
from sqlalchemy import case
from sqlalchemy import literal
from sqlalchemy import literal_column
from sqlalchemy import select
from sqlalchemy import util
Expand Down Expand Up @@ -89,6 +85,7 @@
from sqlalchemy.testing.suite.test_types import ( # noqa: F401, F403
BooleanTest as _BooleanTest,
DateTest as _DateTest,
_DateFixture as _DateFixtureTest,
DateTimeHistoricTest,
DateTimeCoercedToDateTimeTest as _DateTimeCoercedToDateTimeTest,
DateTimeMicrosecondsTest as _DateTimeMicrosecondsTest,
Expand Down Expand Up @@ -299,160 +296,53 @@ class LongNameBlowoutTest(_LongNameBlowoutTest):
pass


class DateTest(_DateTest):
def test_round_trip(self):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table date_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.
"""
date_table = self.tables.date_table

config.db.execute(date_table.insert(), {"id": 1, "date_data": self.data})

row = config.db.execute(select([date_table.c.date_data])).first()

compare = self.compare or self.data
eq_(row, (compare,))
assert isinstance(row[0], type(compare))

def test_null(self):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table date_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.
"""
date_table = self.tables.date_table

config.db.execute(date_table.insert(), {"id": 1, "date_data": None})

row = config.db.execute(select([date_table.c.date_data])).first()
eq_(row, (None,))

@requires.standalone_null_binds_whereclause
def test_null_bound_comparison(self):
class DateFixtureTest(_DateFixtureTest):
@classmethod
def define_tables(cls, metadata):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table date_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.
Cloud Spanner doesn't support auto incrementing ids feature,
which is used by the original test. Overriding the test data
creation method to disable autoincrement and make id column
nullable.
"""

# this test is based on an Oracle issue observed in #4886.
# passing NULL for an expression that needs to be interpreted as
# a certain type, does the DBAPI have the info it needs to do this.
date_table = self.tables.date_table
with config.db.connect() as conn:
result = conn.execute(
date_table.insert(), {"id": 1, "date_data": self.data}
)
id_ = result.inserted_primary_key[0]
stmt = select([date_table.c.id]).where(
case(
[
(
bindparam("foo", type_=self.datatype)
!= None, # noqa: E711,
bindparam("foo", type_=self.datatype),
)
],
else_=date_table.c.date_data,
)
== date_table.c.date_data
)

row = conn.execute(stmt, {"foo": None}).first()
eq_(row[0], id_)
Table(
"date_table",
metadata,
Column("id", Integer, primary_key=True, nullable=True),
Column("date_data", cls.datatype),
)


class DateTimeMicrosecondsTest(_DateTimeMicrosecondsTest):
def test_null(self):
"""
SPANNER OVERRIDE:
class DateTest(DateFixtureTest, _DateTest):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table datetime_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.
"""
date_table = self.tables.date_table
DateTest tests used same class method to create table, so to avoid those failures
and maintain DRY concept just inherit the class to run tests successfully.
"""

config.db.execute(date_table.insert(), {"id": 1, "date_data": None})
pass

row = config.db.execute(select([date_table.c.date_data])).first()
eq_(row, (None,))

class DateTimeMicrosecondsTest(_DateTimeMicrosecondsTest, DateTest):
def test_round_trip(self):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table datetime_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.

Spanner converts timestamp into `%Y-%m-%dT%H:%M:%S.%fZ` format, so to avoid
assert failures convert datetime input to the desire timestamp format.
"""
date_table = self.tables.date_table
config.db.execute(date_table.insert(), {"id": 1, "date_data": self.data})
config.db.execute(date_table.insert(), {"date_data": self.data})

row = config.db.execute(select([date_table.c.date_data])).first()
compare = self.compare or self.data
compare = compare.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
eq_(row[0].rfc3339(), compare)
assert isinstance(row[0], DatetimeWithNanoseconds)

@requires.standalone_null_binds_whereclause
def test_null_bound_comparison(self):
"""
SPANNER OVERRIDE:

Cloud Spanner supports tables with an empty primary key, but only one
row can be inserted into such a table - following insertions will fail
with `400 id must not be NULL in table datetime_table`.
Overriding the tests to add a manual primary key value to avoid the same
failures.
"""
# this test is based on an Oracle issue observed in #4886.
# passing NULL for an expression that needs to be interpreted as
# a certain type, does the DBAPI have the info it needs to do this.
date_table = self.tables.date_table
with config.db.connect() as conn:
result = conn.execute(
date_table.insert(), {"id": 1, "date_data": self.data}
)
id_ = result.inserted_primary_key[0]
stmt = select([date_table.c.id]).where(
case(
[
(
bindparam("foo", type_=self.datatype)
!= None, # noqa: E711,
bindparam("foo", type_=self.datatype),
)
],
else_=date_table.c.date_data,
)
== date_table.c.date_data
)

row = conn.execute(stmt, {"foo": None}).first()
eq_(row[0], id_)


class DateTimeTest(_DateTimeTest, DateTimeMicrosecondsTest):
"""
Expand Down Expand Up @@ -511,81 +401,33 @@ def _round_trip(self, datatype, data):
else:
assert isinstance(row[0], (long, int)) # noqa

@provide_metadata
def _literal_round_trip(self, type_, input_, output, filter_=None):
"""
SPANNER OVERRIDE:

Spanner is not able cleanup data and drop the table correctly,
table was already exists after related tests finished, so it doesn't
create a new table and when started tests for other data type following
insertions will fail with `400 Duplicate name in schema: t.
Overriding the tests to create a new table for test and drop table manually
before it creates a new table to avoid the same failures."""

# for literal, we test the literal render in an INSERT
# into a typed column. we can then SELECT it back as its
# official type; ideally we'd be able to use CAST here
# but MySQL in particular can't CAST fully

t = Table("int_t", self.metadata, Column("x", type_))
t.create()

with db.connect() as conn:
for value in input_:
ins = (
t.insert()
.values(x=literal(value))
.compile(
dialect=db.dialect, compile_kwargs=dict(literal_binds=True),
)
)
conn.execute(ins)

if self.supports_whereclause:
stmt = t.select().where(t.c.x == literal(value))
else:
stmt = t.select()

stmt = stmt.compile(
dialect=db.dialect, compile_kwargs=dict(literal_binds=True),
)
for row in conn.execute(stmt):
value = row[0]
if filter_ is not None:
value = filter_(value)
assert value in output


class UnicodeFixtureTest(_UnicodeFixtureTest):
def test_round_trip(self):
@classmethod
def define_tables(cls, metadata):
"""
SPANNER OVERRIDE:

Cloud Spanner doesn't support the tables with an empty primary key
when column has defined NOT NULL - following insertions will fail with
`400 id must not be NULL in table date_table`.
Overriding the tests and adding a manual primary key value to avoid the same
failures and deleting the table at the end.
Cloud Spanner doesn't support auto incrementing ids feature,
which is used by the original test. Overriding the test data
creation method to disable autoincrement and make id column
nullable.
"""
unicode_table = self.tables.unicode_table

config.db.execute(unicode_table.insert(), {"id": 1, "unicode_data": self.data})

row = config.db.execute(select([unicode_table.c.unicode_data])).first()

eq_(row, (self.data,))
assert isinstance(row[0], util.text_type)
Table(
"unicode_table",
metadata,
Column("id", Integer, primary_key=True, nullable=True),
Column("unicode_data", cls.datatype),
)

def test_round_trip_executemany(self):
"""
SPANNER OVERRIDE:
SPANNER OVERRIDE

Cloud Spanner doesn't support the tables with an empty primary key
when column has defined NOT NULL - following insertions will fail with
`400 id must not be NULL in table date_table`.
Overriding the tests and adding a manual primary key value to avoid the same
failures and deleting the table at the end.
Cloud Spanner supports tables with empty primary key, but
only single one row can be inserted into such a table -
following insertions will fail with `Row [] already exists".
Overriding the test to avoid the same failure.
"""
unicode_table = self.tables.unicode_table

Expand All @@ -599,22 +441,6 @@ def test_round_trip_executemany(self):
for row in rows:
assert isinstance(row[0], util.text_type)

def _test_null_strings(self, connection):
unicode_table = self.tables.unicode_table

connection.execute(unicode_table.insert(), {"id": 1, "unicode_data": None})
row = connection.execute(select([unicode_table.c.unicode_data])).first()
eq_(row, (None,))

def _test_empty_strings(self, connection):
unicode_table = self.tables.unicode_table

connection.execute(
unicode_table.insert(), {"id": 1, "unicode_data": util.u("")}
)
row = connection.execute(select([unicode_table.c.unicode_data])).first()
eq_(row, (util.u(""),))

@pytest.mark.skip("Spanner doesn't support non-ascii characters")
def test_literal(self):
pass
Expand Down