diff --git a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py index 520b0691..5e93d817 100644 --- a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py +++ b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py @@ -126,6 +126,18 @@ def visit_primary_key_constraint(self, constraint): """ return None + def visit_unique_constraint(self, constraint): + """Unique contraints in Spanner are defined with indexes: + https://cloud.google.com/spanner/docs/secondary-indexes#unique-indexes + + The method throws an exception to notify user that in + Spanner unique constraints are done with unique indexes. + """ + raise spanner_dbapi.exceptions.ProgrammingError( + "Spanner doesn't support direct UNIQUE constraints creation. " + "Create UNIQUE indexes instead." + ) + def post_create_table(self, table): """Build statements to be executed after CREATE TABLE. diff --git a/test/test_suite.py b/test/test_suite.py index bebfd01b..17c57bae 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -14,16 +14,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +import operator import pytest -from sqlalchemy.testing import config, db +import sqlalchemy +from sqlalchemy import inspect +from sqlalchemy import testing +from sqlalchemy import ForeignKey +from sqlalchemy import MetaData +from sqlalchemy.schema import DDL +from sqlalchemy.testing import config +from sqlalchemy.testing import db from sqlalchemy.testing import eq_ from sqlalchemy.testing import provide_metadata +from sqlalchemy.testing.provision import temp_table_keyword_args from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table +from sqlalchemy import bindparam +from sqlalchemy import case +from sqlalchemy import literal from sqlalchemy import literal_column - -from sqlalchemy import bindparam, case, literal, select, util +from sqlalchemy import select +from sqlalchemy import util +from sqlalchemy import event from sqlalchemy import exists from sqlalchemy import Boolean from sqlalchemy import String @@ -32,8 +45,10 @@ from google.api_core.datetime_helpers import DatetimeWithNanoseconds -from sqlalchemy.testing.suite.test_ddl import * # noqa: F401, F403 +from google.cloud import spanner_dbapi + from sqlalchemy.testing.suite.test_cte import * # noqa: F401, F403 +from sqlalchemy.testing.suite.test_ddl import * # noqa: F401, F403 from sqlalchemy.testing.suite.test_dialect import * # noqa: F401, F403 from sqlalchemy.testing.suite.test_update_delete import * # noqa: F401, F403 @@ -42,8 +57,10 @@ from sqlalchemy.testing.suite.test_ddl import ( LongNameBlowoutTest as _LongNameBlowoutTest, ) - from sqlalchemy.testing.suite.test_dialect import EscapingTest as _EscapingTest +from sqlalchemy.testing.suite.test_reflection import ( + ComponentReflectionTest as _ComponentReflectionTest, +) from sqlalchemy.testing.suite.test_select import ExistsTest as _ExistsTest from sqlalchemy.testing.suite.test_types import BooleanTest as _BooleanTest from sqlalchemy.testing.suite.test_types import IntegerTest as _IntegerTest @@ -102,6 +119,29 @@ def test_percent_sign_round_trip(self): class CTETest(_CTETest): + @classmethod + def define_tables(cls, metadata): + """ + The original method creates a foreign key without a name, + which causes troubles on test cleanup. Overriding the + method to explicitly set a foreign key name. + """ + Table( + "some_table", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(50)), + Column("parent_id", ForeignKey("some_table.id", name="fk_some_table")), + ) + + Table( + "some_other_table", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(50)), + Column("parent_id", Integer), + ) + @pytest.mark.skip("INSERT from WITH subquery is not supported") def test_insert_from_select_round_trip(self): """ @@ -509,3 +549,136 @@ def _literal_round_trip(self, type_, input_, output, filter_=None): if filter_ is not None: value = filter_(value) assert value in output + + +class ComponentReflectionTest(_ComponentReflectionTest): + @classmethod + def define_temp_tables(cls, metadata): + """ + SPANNER OVERRIDE: + + In Cloud Spanner unique indexes are used instead of directly + creating unique constraints. Overriding the test to replace + constraints with indexes in testing data. + """ + kw = temp_table_keyword_args(config, config.db) + user_tmp = Table( + "user_tmp", + metadata, + Column("id", sqlalchemy.INT, primary_key=True), + Column("name", sqlalchemy.VARCHAR(50)), + Column("foo", sqlalchemy.INT), + sqlalchemy.Index("user_tmp_uq", "name", unique=True), + sqlalchemy.Index("user_tmp_ix", "foo"), + **kw + ) + if ( + testing.requires.view_reflection.enabled + and testing.requires.temporary_views.enabled + ): + event.listen( + user_tmp, + "after_create", + DDL("create temporary view user_tmp_v as " "select * from user_tmp"), + ) + event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v")) + + @testing.provide_metadata + def _test_get_unique_constraints(self, schema=None): + """ + SPANNER OVERRIDE: + + In Cloud Spanner unique indexes are used instead of directly + creating unique constraints. Overriding the test to replace + constraints with indexes in testing data. + """ + # SQLite dialect needs to parse the names of the constraints + # separately from what it gets from PRAGMA index_list(), and + # then matches them up. so same set of column_names in two + # constraints will confuse it. Perhaps we should no longer + # bother with index_list() here since we have the whole + # CREATE TABLE? + uniques = sorted( + [ + {"name": "unique_a", "column_names": ["a"]}, + {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]}, + {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]}, + {"name": "unique_asc_key", "column_names": ["asc", "key"]}, + {"name": "i.have.dots", "column_names": ["b"]}, + {"name": "i have spaces", "column_names": ["c"]}, + ], + key=operator.itemgetter("name"), + ) + orig_meta = self.metadata + table = Table( + "testtbl", + orig_meta, + Column("a", sqlalchemy.String(20)), + Column("b", sqlalchemy.String(30)), + Column("c", sqlalchemy.Integer), + # reserved identifiers + Column("asc", sqlalchemy.String(30)), + Column("key", sqlalchemy.String(30)), + schema=schema, + ) + for uc in uniques: + table.append_constraint( + sqlalchemy.Index(uc["name"], *uc["column_names"], unique=True) + ) + orig_meta.create_all() + + inspector = inspect(orig_meta.bind) + reflected = sorted( + inspector.get_unique_constraints("testtbl", schema=schema), + key=operator.itemgetter("name"), + ) + + names_that_duplicate_index = set() + + for orig, refl in zip(uniques, reflected): + # Different dialects handle duplicate index and constraints + # differently, so ignore this flag + dupe = refl.pop("duplicates_index", None) + if dupe: + names_that_duplicate_index.add(dupe) + eq_(orig, refl) + + reflected_metadata = MetaData() + reflected = Table( + "testtbl", reflected_metadata, autoload_with=orig_meta.bind, schema=schema, + ) + + # test "deduplicates for index" logic. MySQL and Oracle + # "unique constraints" are actually unique indexes (with possible + # exception of a unique that is a dupe of another one in the case + # of Oracle). make sure # they aren't duplicated. + idx_names = set([idx.name for idx in reflected.indexes]) + uq_names = set( + [ + uq.name + for uq in reflected.constraints + if isinstance(uq, sqlalchemy.UniqueConstraint) + ] + ).difference(["unique_c_a_b"]) + + assert not idx_names.intersection(uq_names) + if names_that_duplicate_index: + eq_(names_that_duplicate_index, idx_names) + eq_(uq_names, set()) + + @testing.provide_metadata + def test_unique_constraint_raises(self): + """ + Checking that unique constraint creation + fails due to a ProgrammingError. + """ + Table( + "user_tmp_failure", + self.metadata, + Column("id", sqlalchemy.INT, primary_key=True), + Column("name", sqlalchemy.VARCHAR(50)), + sqlalchemy.UniqueConstraint("name", name="user_tmp_uq"), + ) + + with pytest.raises(spanner_dbapi.exceptions.ProgrammingError): + self.metadata.create_all()