From 23b1e1ac012538cf53f3d6140aaca2cdd8a12c3e Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Tue, 16 Sep 2025 15:34:09 -0600 Subject: [PATCH 01/23] feat(api): implement `upsert()` using `MERGE INTO` --- ibis/backends/sql/__init__.py | 113 ++++++++++++++++++++++++++++- ibis/backends/tests/test_client.py | 65 +++++++++++++++++ 2 files changed, 177 insertions(+), 1 deletion(-) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index c3a03823cb7e..906d424dc40d 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -423,7 +423,7 @@ def insert( Parameters ---------- name - The name of the table to which data needs will be inserted + The name of the table to which data will be inserted obj The source data or expression to insert database @@ -526,6 +526,117 @@ def _build_insert_template( ), ).sql(self.dialect) + def upsert( + self, + name: str, + /, + obj: pd.DataFrame | ir.Table | list | dict, + on: str, + *, + database: str | None = None, + ) -> None: + """Upsert data into a table. + + ::: {.callout-note} + ## Ibis does not use the word `schema` to refer to database hierarchy. + + A collection of `table` is referred to as a `database`. + A collection of `database` is referred to as a `catalog`. + + These terms are mapped onto the corresponding features in each + backend (where available), regardless of whether the backend itself + uses the same terminology. + ::: + + Parameters + ---------- + name + The name of the table to which data will be upserted + obj + The source data or expression to upsert + on + Column name to join on + database + Name of the attached database that the table is located in. + + For backends that support multi-level table hierarchies, you can + pass in a dotted string path like `"catalog.database"` or a tuple of + strings like `("catalog", "database")`. + """ + table_loc = self._to_sqlglot_table(database) + catalog, db = self._to_catalog_db_tuple(table_loc) + + if not isinstance(obj, ir.Table): + obj = ibis.memtable(obj) + + self._run_pre_execute_hooks(obj) + + query = self._build_upsert_from_table( + target=name, source=obj, on=on, db=db, catalog=catalog + ) + + with self._safe_raw_sql(query): + pass + + def _build_upsert_from_table( + self, + *, + target: str, + source, + on: str, + db: str | None = None, + catalog: str | None = None, + ): + compiler = self.compiler + quoted = compiler.quoted + # Compare the columns between the target table and the object to be inserted + # If source is a subset of target, use source columns for insert list + # Otherwise, assume auto-generated column names and use positional ordering. + target_cols = self.get_schema(target, catalog=catalog, database=db).keys() + + columns = ( + source_cols + if (source_cols := source.schema().keys()) <= target_cols + else target_cols + ) + + source_alias = util.gen_name("source") + target_alias = util.gen_name("target") + query = sge.merge( + sge.When( + matched=True, + then=sge.Update( + expressions=[ + sg.column(col, quoted=quoted).eq( + sg.column(col, source_alias, quoted=quoted) + ) + for col in columns + ] + ), + ), + sge.When( + matched=False, + then=sge.Insert( + this=sge.Tuple(expressions=columns), + expression=sge.Tuple( + expressions=[ + sg.column(col, source_alias, quoted=quoted) + for col in columns + ] + ), + ), + ), + into=sg.table(target, db=db, catalog=catalog, quoted=quoted).as_( + target_alias + ), + using=f"({self.compile(source)}) AS {source_alias}", + on=sg.column(on, table=target_alias, quoted=quoted).eq( + sg.column(on, table=source_alias, quoted=quoted) + ), + dialect=compiler.dialect, + ) + return query + def truncate_table(self, name: str, /, *, database: str | None = None) -> None: """Delete all rows from a table. diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 45a8d1af74b5..e700d0b97bff 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -519,6 +519,34 @@ def employee_data_2_temp_table( con.drop_table(temp_table_name, force=True) +@pytest.fixture +def test_employee_data_3(): + import pandas as pd + + df3 = pd.DataFrame( + { + "first_name": ["B", "Y", "Z"], + "last_name": ["A", "B", "C"], + "department_name": ["XX", "YY", "ZZ"], + "salary": [400.0, 500.0, 600.0], + } + ) + + return df3 + + +@pytest.fixture +def employee_data_3_temp_table( + backend, con, test_employee_schema, test_employee_data_3 +): + temp_table_name = gen_name("temp_employee_data_3") + _create_temp_table_with_schema( + backend, con, temp_table_name, test_employee_schema, data=test_employee_data_3 + ) + yield temp_table_name + con.drop_table(temp_table_name, force=True) + + @pytest.mark.notimpl(["polars"], reason="`insert` method not implemented") def test_insert_no_overwrite_from_dataframe( backend, con, test_employee_data_2, employee_empty_temp_table @@ -626,6 +654,43 @@ def _emp(a, b, c, d): assert len(con.table(employee_data_1_temp_table).execute()) == 3 +@pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +def test_upsert_from_dataframe( + backend, con, employee_data_1_temp_table, test_employee_data_3 +): + temporary = con.table(employee_data_1_temp_table) + df1 = temporary.execute().set_index("first_name") + + con.upsert(employee_data_1_temp_table, obj=test_employee_data_3, on="first_name") + result = temporary.execute() + df2 = test_employee_data_3.set_index("first_name") + expected = pd.concat([df1[~df1.index.isin(df2.index)], df2]).reset_index() + assert len(result) == len(expected) + backend.assert_frame_equal( + result.sort_values("first_name").reset_index(drop=True), + expected.sort_values("first_name").reset_index(drop=True), + ) + + +@pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +def test_upsert_from_expr( + backend, con, employee_data_1_temp_table, employee_data_3_temp_table +): + temporary = con.table(employee_data_1_temp_table) + from_table = con.table(employee_data_3_temp_table) + df1 = temporary.execute().set_index("first_name") + + con.upsert(employee_data_1_temp_table, obj=from_table, on="first_name") + result = temporary.execute() + df2 = from_table.execute().set_index("first_name") + expected = pd.concat([df1[~df1.index.isin(df2.index)], df2]).reset_index() + assert len(result) == len(expected) + backend.assert_frame_equal( + result.sort_values("first_name").reset_index(drop=True), + expected.sort_values("first_name").reset_index(drop=True), + ) + + @pytest.mark.notimpl( ["polars"], raises=AttributeError, reason="`insert` method not implemented" ) From 14454d1a4ac0dd3f5215b54198377f61fec3f09f Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 17 Sep 2025 15:06:04 -0600 Subject: [PATCH 02/23] fix(oracle): attempt to support `MERGE` on backend --- ibis/backends/oracle/__init__.py | 5 +++++ ibis/backends/sql/__init__.py | 24 ++++++++++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 3d23551cc8fe..6b056518181c 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -274,6 +274,11 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: con = self.con cursor = con.cursor() + # TODO(deepyaman): Fix Oracle MERGE query generation in SQLGlot. + if "MERGE" in query: + assert "AS " in query + query = query.replace("AS ", "") + try: cursor.execute(query, **kwargs) except Exception: diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 906d424dc40d..0a07b520058c 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -607,31 +607,39 @@ def _build_upsert_from_table( matched=True, then=sge.Update( expressions=[ - sg.column(col, quoted=quoted).eq( - sg.column(col, source_alias, quoted=quoted) + sg.column(col, table=target_alias, quoted=quoted).eq( + sg.column(col, table=source_alias, quoted=quoted) ) for col in columns + if col != on ] ), ), sge.When( matched=False, then=sge.Insert( - this=sge.Tuple(expressions=columns), + this=sge.Tuple( + expressions=[ + sg.column(col, table=target_alias, quoted=quoted) + for col in columns + ] + ), expression=sge.Tuple( expressions=[ - sg.column(col, source_alias, quoted=quoted) + sg.column(col, table=source_alias, quoted=quoted) for col in columns ] ), ), ), into=sg.table(target, db=db, catalog=catalog, quoted=quoted).as_( - target_alias + sg.to_identifier(target_alias, quoted=quoted) ), - using=f"({self.compile(source)}) AS {source_alias}", - on=sg.column(on, table=target_alias, quoted=quoted).eq( - sg.column(on, table=source_alias, quoted=quoted) + using=f"({self.compile(source)}) AS {sg.to_identifier(source_alias, quoted=quoted)}", + on=sge.Paren( + this=sg.column(on, table=target_alias, quoted=quoted).eq( + sg.column(on, table=source_alias, quoted=quoted) + ) ), dialect=compiler.dialect, ) From 8f3c7e25e3ab4863eec256b4ba8b01048cf5af64 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 17 Sep 2025 17:13:03 -0600 Subject: [PATCH 03/23] fix(api): ensure DuckDB still works in "hacky" way --- ibis/backends/sql/__init__.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 0a07b520058c..bb96ba7d4b77 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -607,9 +607,11 @@ def _build_upsert_from_table( matched=True, then=sge.Update( expressions=[ - sg.column(col, table=target_alias, quoted=quoted).eq( - sg.column(col, table=source_alias, quoted=quoted) - ) + sg.column( + col, + table=target_alias if self.name == "oracle" else None, + quoted=quoted, + ).eq(sg.column(col, table=source_alias, quoted=quoted)) for col in columns if col != on ] @@ -620,7 +622,11 @@ def _build_upsert_from_table( then=sge.Insert( this=sge.Tuple( expressions=[ - sg.column(col, table=target_alias, quoted=quoted) + sg.column( + col, + table=target_alias if self.name == "oracle" else None, + quoted=quoted, + ) for col in columns ] ), From 80d2a9c554bca9d49cdfb608edebd33e710f2cb9 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 17 Sep 2025 22:34:42 -0600 Subject: [PATCH 04/23] fix(mssql): attempt to support `MERGE` for backend --- ibis/backends/mssql/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index 4e700b6787e1..f129989f7bbd 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -424,6 +424,8 @@ def _safe_raw_sql(self, query, *args, **kwargs): with contextlib.suppress(AttributeError): query = query.sql(self.dialect) + query = f"{query};" + with self.begin() as cur: cur.execute(query, *args, **kwargs) yield cur From aceca22303b1144d5e137b8220da1e233d91b3d0 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Sep 2025 09:10:30 -0600 Subject: [PATCH 05/23] chore(oracle): remove hack, fixed in SQLGlot 27.16 --- ibis/backends/oracle/__init__.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 6b056518181c..3d23551cc8fe 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -274,11 +274,6 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any: con = self.con cursor = con.cursor() - # TODO(deepyaman): Fix Oracle MERGE query generation in SQLGlot. - if "MERGE" in query: - assert "AS " in query - query = query.replace("AS ", "") - try: cursor.execute(query, **kwargs) except Exception: From b452f59e2a9765ef8787649d18cc50797dde236e Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Sep 2025 15:25:22 -0600 Subject: [PATCH 06/23] chore(mssql): only add semicolon for MERGE queries --- ibis/backends/mssql/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index f129989f7bbd..cbed6c933e2a 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -424,7 +424,8 @@ def _safe_raw_sql(self, query, *args, **kwargs): with contextlib.suppress(AttributeError): query = query.sql(self.dialect) - query = f"{query};" + if "MERGE" in query: + query = f"{query};" with self.begin() as cur: cur.execute(query, *args, **kwargs) From be616030ac45981128939e752f7d9ab5de428fe9 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sat, 20 Sep 2025 09:53:13 -0600 Subject: [PATCH 07/23] fix(trino): make SQL generation work (but not run) --- ibis/backends/sql/__init__.py | 2 +- ibis/backends/tests/test_client.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index bb96ba7d4b77..29415e312436 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -639,7 +639,7 @@ def _build_upsert_from_table( ), ), into=sg.table(target, db=db, catalog=catalog, quoted=quoted).as_( - sg.to_identifier(target_alias, quoted=quoted) + sg.to_identifier(target_alias, quoted=quoted), table=True ), using=f"({self.compile(source)}) AS {sg.to_identifier(source_alias, quoted=quoted)}", on=sge.Paren( diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index e700d0b97bff..183871670140 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -37,6 +37,7 @@ PyDruidProgrammingError, PyODBCProgrammingError, SnowflakeProgrammingError, + TrinoUserError, ) from ibis.util import gen_name @@ -655,6 +656,11 @@ def _emp(a, b, c, d): @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +@pytest.mark.notyet( + ["trino"], + raises=TrinoUserError, + reason="connector does not support modifying table rows", +) def test_upsert_from_dataframe( backend, con, employee_data_1_temp_table, test_employee_data_3 ): @@ -673,6 +679,11 @@ def test_upsert_from_dataframe( @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +@pytest.mark.notyet( + ["trino"], + raises=TrinoUserError, + reason="connector does not support modifying table rows", +) def test_upsert_from_expr( backend, con, employee_data_1_temp_table, employee_data_3_temp_table ): From 71984eb454874710b3a1e698678789fffa496ad7 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sat, 20 Sep 2025 10:42:38 -0600 Subject: [PATCH 08/23] test(pyspark): specify expected failures for merge --- ibis/backends/tests/errors.py | 7 ++++++- ibis/backends/tests/test_client.py | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/ibis/backends/tests/errors.py b/ibis/backends/tests/errors.py index 0c06dcbf8d12..a961166c98b3 100644 --- a/ibis/backends/tests/errors.py +++ b/ibis/backends/tests/errors.py @@ -55,13 +55,18 @@ from pyspark.errors.exceptions.base import ParseException as PySparkParseException from pyspark.errors.exceptions.base import PySparkValueError from pyspark.errors.exceptions.base import PythonException as PySparkPythonException + from pyspark.errors.exceptions.base import ( + UnsupportedOperationException as PySparkUnsupportedOperationException, + ) from pyspark.errors.exceptions.connect import ( SparkConnectGrpcException as PySparkConnectGrpcException, ) except ImportError: PySparkParseException = PySparkAnalysisException = PySparkArithmeticException = ( PySparkPythonException - ) = PySparkConnectGrpcException = PySparkValueError = None + ) = PySparkUnsupportedOperationException = PySparkConnectGrpcException = ( + PySparkValueError + ) = None try: from google.api_core.exceptions import BadRequest as GoogleBadRequest diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 183871670140..a3dce7b48f4d 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -36,6 +36,7 @@ PyAthenaDatabaseError, PyDruidProgrammingError, PyODBCProgrammingError, + PySparkUnsupportedOperationException, SnowflakeProgrammingError, TrinoUserError, ) @@ -49,6 +50,7 @@ pd = pytest.importorskip("pandas") pa = pytest.importorskip("pyarrow") ds = pytest.importorskip("pyarrow.dataset") +pyspark = pytest.importorskip("pyspark") @pytest.fixture @@ -656,6 +658,13 @@ def _emp(a, b, c, d): @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +@pytest.mark.notyet( + ["pyspark"], + raises=PySparkUnsupportedOperationException + if vparse(pyspark.__version__) >= vparse("3.5") + else Py4JJavaError, + reason="MERGE INTO TABLE is not supported temporarily", +) @pytest.mark.notyet( ["trino"], raises=TrinoUserError, @@ -679,6 +688,13 @@ def test_upsert_from_dataframe( @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") +@pytest.mark.notyet( + ["pyspark"], + raises=PySparkUnsupportedOperationException + if vparse(pyspark.__version__) >= vparse("3.5") + else Py4JJavaError, + reason="MERGE INTO TABLE is not supported temporarily", +) @pytest.mark.notyet( ["trino"], raises=TrinoUserError, From 7098a8d9dce4642959a9c751130b0aceb7fb23db Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sat, 20 Sep 2025 11:18:28 -0600 Subject: [PATCH 09/23] test(pyspark): don't importorskip for all backends --- ibis/backends/tests/test_client.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index a3dce7b48f4d..5b9c0b0928a3 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -50,7 +50,6 @@ pd = pytest.importorskip("pandas") pa = pytest.importorskip("pyarrow") ds = pytest.importorskip("pyarrow.dataset") -pyspark = pytest.importorskip("pyspark") @pytest.fixture @@ -657,12 +656,22 @@ def _emp(a, b, c, d): assert len(con.table(employee_data_1_temp_table).execute()) == 3 +try: + import pyspark + + pyspark_merge_exception = ( + PySparkUnsupportedOperationException + if vparse(pyspark.__version__) >= vparse("3.5") + else Py4JJavaError + ) +except ImportError: + pyspark_merge_exception = None + + @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") @pytest.mark.notyet( ["pyspark"], - raises=PySparkUnsupportedOperationException - if vparse(pyspark.__version__) >= vparse("3.5") - else Py4JJavaError, + raises=pyspark_merge_exception, reason="MERGE INTO TABLE is not supported temporarily", ) @pytest.mark.notyet( @@ -690,9 +699,7 @@ def test_upsert_from_dataframe( @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") @pytest.mark.notyet( ["pyspark"], - raises=PySparkUnsupportedOperationException - if vparse(pyspark.__version__) >= vparse("3.5") - else Py4JJavaError, + raises=pyspark_merge_exception, reason="MERGE INTO TABLE is not supported temporarily", ) @pytest.mark.notyet( From a655828eff41850ad3a851ab6fbbf9419e44782c Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sat, 20 Sep 2025 16:36:15 -0600 Subject: [PATCH 10/23] refactor(backends): make qualifying cols an option --- ibis/backends/__init__.py | 1 + ibis/backends/oracle/__init__.py | 4 ++++ ibis/backends/sql/__init__.py | 5 +++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index a938f076b407..67777156c784 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -977,6 +977,7 @@ def __init__(self, *args, **kwargs): self._con_kwargs: dict[str, Any] = kwargs self._can_reconnect: bool = True self._memtables = weakref.WeakSet() + self._qualify_merge_target_columns = False super().__init__() @property diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 3d23551cc8fe..8637134cc713 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -91,6 +91,10 @@ class Backend( compiler = sc.oracle.compiler supports_temporary_tables = True + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._qualify_merge_target_columns = True + @cached_property def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.version) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 29415e312436..8954098c0e01 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -588,6 +588,7 @@ def _build_upsert_from_table( catalog: str | None = None, ): compiler = self.compiler + qualify_target_columns = self._qualify_merge_target_columns quoted = compiler.quoted # Compare the columns between the target table and the object to be inserted # If source is a subset of target, use source columns for insert list @@ -609,7 +610,7 @@ def _build_upsert_from_table( expressions=[ sg.column( col, - table=target_alias if self.name == "oracle" else None, + table=target_alias if qualify_target_columns else None, quoted=quoted, ).eq(sg.column(col, table=source_alias, quoted=quoted)) for col in columns @@ -624,7 +625,7 @@ def _build_upsert_from_table( expressions=[ sg.column( col, - table=target_alias if self.name == "oracle" else None, + table=target_alias if qualify_target_columns else None, quoted=quoted, ) for col in columns From 93d3a0e15880dc2d2751b44776fe9c5e6938579b Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sat, 20 Sep 2025 17:09:37 -0600 Subject: [PATCH 11/23] test(backends): xfail where MERGE INTO unsupported --- ibis/backends/tests/test_client.py | 43 ++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 5b9c0b0928a3..c40d4791522d 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -7,6 +7,7 @@ import json import os import re +import sqlite3 import string import subprocess import sys @@ -26,9 +27,11 @@ import ibis.expr.operations as ops from ibis.backends.conftest import ALL_BACKENDS from ibis.backends.tests.errors import ( + ClickHouseDatabaseError, DatabricksServerOperationError, ExaQueryError, ImpalaHiveServer2Error, + MySQLProgrammingError, OracleDatabaseError, PsycoPg2InternalError, PsycoPgUndefinedObject, @@ -668,12 +671,32 @@ def _emp(a, b, c, d): pyspark_merge_exception = None +@pytest.mark.notyet( + ["clickhouse"], raises=ClickHouseDatabaseError, reason="MERGE INTO is not supported" +) +@pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported") +@pytest.mark.notyet( + ["impala"], + raises=ImpalaHiveServer2Error, + reason="target table must be an Iceberg table", +) +@pytest.mark.notyet( + ["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported" +) @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") @pytest.mark.notyet( ["pyspark"], raises=pyspark_merge_exception, reason="MERGE INTO TABLE is not supported temporarily", ) +@pytest.mark.notyet( + ["risingwave"], + raises=PsycoPg2InternalError, + reason="MERGE INTO is not supported", +) +@pytest.mark.notyet( + ["sqlite"], raises=sqlite3.OperationalError, reason="MERGE INTO is not supported" +) @pytest.mark.notyet( ["trino"], raises=TrinoUserError, @@ -696,12 +719,32 @@ def test_upsert_from_dataframe( ) +@pytest.mark.notyet( + ["clickhouse"], raises=ClickHouseDatabaseError, reason="MERGE INTO is not supported" +) +@pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported") +@pytest.mark.notyet( + ["impala"], + raises=ImpalaHiveServer2Error, + reason="target table must be an Iceberg table", +) +@pytest.mark.notyet( + ["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported" +) @pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") @pytest.mark.notyet( ["pyspark"], raises=pyspark_merge_exception, reason="MERGE INTO TABLE is not supported temporarily", ) +@pytest.mark.notyet( + ["risingwave"], + raises=PsycoPg2InternalError, + reason="MERGE INTO is not supported", +) +@pytest.mark.notyet( + ["sqlite"], raises=sqlite3.OperationalError, reason="MERGE INTO is not supported" +) @pytest.mark.notyet( ["trino"], raises=TrinoUserError, From e864a7ca13ba4fe90314a91bda2dd54a98a202af Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Tue, 14 Oct 2025 20:40:59 -0600 Subject: [PATCH 12/23] refactor(api): move duplicated logic into function --- ibis/backends/sql/__init__.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 8954098c0e01..00b9d1d2aa62 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -453,22 +453,30 @@ def insert( with self._safe_raw_sql(query): pass - def _build_insert_from_table( + def _get_columns_to_insert( self, *, target: str, source, db: str | None = None, catalog: str | None = None ): - compiler = self.compiler - quoted = compiler.quoted # Compare the columns between the target table and the object to be inserted # If source is a subset of target, use source columns for insert list # Otherwise, assume auto-generated column names and use positional ordering. target_cols = self.get_schema(target, catalog=catalog, database=db).keys() - columns = ( + return ( source_cols if (source_cols := source.schema().keys()) <= target_cols else target_cols ) + def _build_insert_from_table( + self, *, target: str, source, db: str | None = None, catalog: str | None = None + ): + compiler = self.compiler + quoted = compiler.quoted + + columns = self._get_columns_to_insert( + target=target, source=source, db=db, catalog=catalog + ) + query = sge.insert( expression=self.compile(source), into=sg.table(target, db=db, catalog=catalog, quoted=quoted), @@ -590,15 +598,9 @@ def _build_upsert_from_table( compiler = self.compiler qualify_target_columns = self._qualify_merge_target_columns quoted = compiler.quoted - # Compare the columns between the target table and the object to be inserted - # If source is a subset of target, use source columns for insert list - # Otherwise, assume auto-generated column names and use positional ordering. - target_cols = self.get_schema(target, catalog=catalog, database=db).keys() - columns = ( - source_cols - if (source_cols := source.schema().keys()) <= target_cols - else target_cols + columns = self._get_columns_to_insert( + target=target, source=source, db=db, catalog=catalog ) source_alias = util.gen_name("source") From 130a8bbd53d88e1848a4a1365320128827522646 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Tue, 14 Oct 2025 23:50:35 -0600 Subject: [PATCH 13/23] test(api): add upsert from more complex expression --- ibis/backends/tests/test_client.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index c40d4791522d..be216e455588 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -750,11 +750,24 @@ def test_upsert_from_dataframe( raises=TrinoUserError, reason="connector does not support modifying table rows", ) +@pytest.mark.parametrize( + "from_table_modifier", [ + param(lambda x: x, id="simple"), + param( + lambda x: x.filter(ibis._.salary > 0).order_by("first_name"), + id="with_filter_and_order_by" + ), + ], +) def test_upsert_from_expr( - backend, con, employee_data_1_temp_table, employee_data_3_temp_table + backend, + con, + employee_data_1_temp_table, + employee_data_3_temp_table, + from_table_modifier, ): temporary = con.table(employee_data_1_temp_table) - from_table = con.table(employee_data_3_temp_table) + from_table = from_table_modifier(con.table(employee_data_3_temp_table)) df1 = temporary.execute().set_index("first_name") con.upsert(employee_data_1_temp_table, obj=from_table, on="first_name") From 39bc47d7091b60f10b8181ab7385c273e7179043 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 15 Oct 2025 00:18:28 -0600 Subject: [PATCH 14/23] chore(api): don't qualify `MERGE` target col names --- ibis/backends/__init__.py | 1 - ibis/backends/oracle/__init__.py | 4 ---- ibis/backends/sql/__init__.py | 18 ++++-------------- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index 67777156c784..a938f076b407 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -977,7 +977,6 @@ def __init__(self, *args, **kwargs): self._con_kwargs: dict[str, Any] = kwargs self._can_reconnect: bool = True self._memtables = weakref.WeakSet() - self._qualify_merge_target_columns = False super().__init__() @property diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index 8637134cc713..3d23551cc8fe 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -91,10 +91,6 @@ class Backend( compiler = sc.oracle.compiler supports_temporary_tables = True - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._qualify_merge_target_columns = True - @cached_property def version(self): matched = re.search(r"(\d+)\.(\d+)\.(\d+)", self.con.version) diff --git a/ibis/backends/sql/__init__.py b/ibis/backends/sql/__init__.py index 00b9d1d2aa62..6687bdf69337 100644 --- a/ibis/backends/sql/__init__.py +++ b/ibis/backends/sql/__init__.py @@ -596,7 +596,6 @@ def _build_upsert_from_table( catalog: str | None = None, ): compiler = self.compiler - qualify_target_columns = self._qualify_merge_target_columns quoted = compiler.quoted columns = self._get_columns_to_insert( @@ -610,11 +609,9 @@ def _build_upsert_from_table( matched=True, then=sge.Update( expressions=[ - sg.column( - col, - table=target_alias if qualify_target_columns else None, - quoted=quoted, - ).eq(sg.column(col, table=source_alias, quoted=quoted)) + sg.column(col, quoted=quoted).eq( + sg.column(col, table=source_alias, quoted=quoted) + ) for col in columns if col != on ] @@ -624,14 +621,7 @@ def _build_upsert_from_table( matched=False, then=sge.Insert( this=sge.Tuple( - expressions=[ - sg.column( - col, - table=target_alias if qualify_target_columns else None, - quoted=quoted, - ) - for col in columns - ] + expressions=[sg.column(col, quoted=quoted) for col in columns] ), expression=sge.Tuple( expressions=[ From b3c37dcfd89bb8cb8b689a9089181fbebba3cf6c Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 15 Oct 2025 00:36:27 -0600 Subject: [PATCH 15/23] test(api): expect failure using MSSQL and ORDER BY --- ibis/backends/tests/test_client.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index be216e455588..37b8ad918a20 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -750,24 +750,21 @@ def test_upsert_from_dataframe( raises=TrinoUserError, reason="connector does not support modifying table rows", ) -@pytest.mark.parametrize( - "from_table_modifier", [ - param(lambda x: x, id="simple"), - param( - lambda x: x.filter(ibis._.salary > 0).order_by("first_name"), - id="with_filter_and_order_by" - ), - ], -) +@pytest.mark.parametrize("with_order_by", [True, False]) def test_upsert_from_expr( - backend, - con, - employee_data_1_temp_table, - employee_data_3_temp_table, - from_table_modifier, + backend, con, employee_data_1_temp_table, employee_data_3_temp_table, with_order_by ): temporary = con.table(employee_data_1_temp_table) - from_table = from_table_modifier(con.table(employee_data_3_temp_table)) + from_table = con.table(employee_data_3_temp_table) + if with_order_by: + if backend.name() == "mssql": + pytest.xfail( + "MSSQL doesn't allow ORDER BY in subqueries, unless " + "TOP, OFFSET or FOR XML is also specified" + ) + + from_table = from_table.filter(ibis._.salary > 0).order_by("first_name") + df1 = temporary.execute().set_index("first_name") con.upsert(employee_data_1_temp_table, obj=from_table, on="first_name") From 7f5001f16f49de6cdc25848358aca2a5ba5077d1 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 15 Oct 2025 20:32:27 -0600 Subject: [PATCH 16/23] refactor(api): combine marks as `NO_MERGE_SUPPORT` --- ibis/backends/tests/conftest.py | 65 ++++++++++++++++++++++- ibis/backends/tests/test_client.py | 82 ++---------------------------- 2 files changed, 66 insertions(+), 81 deletions(-) diff --git a/ibis/backends/tests/conftest.py b/ibis/backends/tests/conftest.py index a9c69b64b57b..6a5e16a94b4b 100644 --- a/ibis/backends/tests/conftest.py +++ b/ibis/backends/tests/conftest.py @@ -1,9 +1,21 @@ from __future__ import annotations +import sqlite3 + import pytest +from packaging.version import parse as vparse import ibis.common.exceptions as com -from ibis.backends.tests.errors import MySQLOperationalError +from ibis.backends.tests.errors import ( + ClickHouseDatabaseError, + ImpalaHiveServer2Error, + MySQLOperationalError, + MySQLProgrammingError, + PsycoPg2InternalError, + Py4JJavaError, + PySparkUnsupportedOperationException, + TrinoUserError, +) def combine_marks(marks: list) -> callable: @@ -50,7 +62,6 @@ def decorator(func): ] NO_ARRAY_SUPPORT = combine_marks(NO_ARRAY_SUPPORT_MARKS) - NO_STRUCT_SUPPORT_MARKS = [ pytest.mark.never(["mysql", "sqlite", "mssql"], reason="No struct support"), pytest.mark.notyet(["impala"]), @@ -78,3 +89,53 @@ def decorator(func): pytest.mark.notimpl(["datafusion", "exasol", "mssql", "druid", "oracle"]), ] NO_JSON_SUPPORT = combine_marks(NO_JSON_SUPPORT_MARKS) + +try: + import pyspark + + pyspark_merge_exception = ( + PySparkUnsupportedOperationException + if vparse(pyspark.__version__) >= vparse("3.5") + else Py4JJavaError + ) +except ImportError: + pyspark_merge_exception = None + +NO_MERGE_SUPPORT_MARKS = [ + pytest.mark.notyet( + ["clickhouse"], + raises=ClickHouseDatabaseError, + reason="MERGE INTO is not supported", + ), + pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported"), + pytest.mark.notyet( + ["impala"], + raises=ImpalaHiveServer2Error, + reason="target table must be an Iceberg table", + ), + pytest.mark.notyet( + ["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported" + ), + pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented"), + pytest.mark.notyet( + ["pyspark"], + raises=pyspark_merge_exception, + reason="MERGE INTO TABLE is not supported temporarily", + ), + pytest.mark.notyet( + ["risingwave"], + raises=PsycoPg2InternalError, + reason="MERGE INTO is not supported", + ), + pytest.mark.notyet( + ["sqlite"], + raises=sqlite3.OperationalError, + reason="MERGE INTO is not supported", + ), + pytest.mark.notyet( + ["trino"], + raises=TrinoUserError, + reason="connector does not support modifying table rows", + ), +] +NO_MERGE_SUPPORT = combine_marks(NO_MERGE_SUPPORT_MARKS) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 37b8ad918a20..3053bdc0ba10 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -7,7 +7,6 @@ import json import os import re -import sqlite3 import string import subprocess import sys @@ -26,12 +25,11 @@ import ibis.expr.datatypes as dt import ibis.expr.operations as ops from ibis.backends.conftest import ALL_BACKENDS +from ibis.backends.tests.conftest import NO_MERGE_SUPPORT from ibis.backends.tests.errors import ( - ClickHouseDatabaseError, DatabricksServerOperationError, ExaQueryError, ImpalaHiveServer2Error, - MySQLProgrammingError, OracleDatabaseError, PsycoPg2InternalError, PsycoPgUndefinedObject, @@ -39,9 +37,7 @@ PyAthenaDatabaseError, PyDruidProgrammingError, PyODBCProgrammingError, - PySparkUnsupportedOperationException, SnowflakeProgrammingError, - TrinoUserError, ) from ibis.util import gen_name @@ -659,49 +655,7 @@ def _emp(a, b, c, d): assert len(con.table(employee_data_1_temp_table).execute()) == 3 -try: - import pyspark - - pyspark_merge_exception = ( - PySparkUnsupportedOperationException - if vparse(pyspark.__version__) >= vparse("3.5") - else Py4JJavaError - ) -except ImportError: - pyspark_merge_exception = None - - -@pytest.mark.notyet( - ["clickhouse"], raises=ClickHouseDatabaseError, reason="MERGE INTO is not supported" -) -@pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported") -@pytest.mark.notyet( - ["impala"], - raises=ImpalaHiveServer2Error, - reason="target table must be an Iceberg table", -) -@pytest.mark.notyet( - ["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported" -) -@pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") -@pytest.mark.notyet( - ["pyspark"], - raises=pyspark_merge_exception, - reason="MERGE INTO TABLE is not supported temporarily", -) -@pytest.mark.notyet( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="MERGE INTO is not supported", -) -@pytest.mark.notyet( - ["sqlite"], raises=sqlite3.OperationalError, reason="MERGE INTO is not supported" -) -@pytest.mark.notyet( - ["trino"], - raises=TrinoUserError, - reason="connector does not support modifying table rows", -) +@NO_MERGE_SUPPORT def test_upsert_from_dataframe( backend, con, employee_data_1_temp_table, test_employee_data_3 ): @@ -719,37 +673,7 @@ def test_upsert_from_dataframe( ) -@pytest.mark.notyet( - ["clickhouse"], raises=ClickHouseDatabaseError, reason="MERGE INTO is not supported" -) -@pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported") -@pytest.mark.notyet( - ["impala"], - raises=ImpalaHiveServer2Error, - reason="target table must be an Iceberg table", -) -@pytest.mark.notyet( - ["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported" -) -@pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented") -@pytest.mark.notyet( - ["pyspark"], - raises=pyspark_merge_exception, - reason="MERGE INTO TABLE is not supported temporarily", -) -@pytest.mark.notyet( - ["risingwave"], - raises=PsycoPg2InternalError, - reason="MERGE INTO is not supported", -) -@pytest.mark.notyet( - ["sqlite"], raises=sqlite3.OperationalError, reason="MERGE INTO is not supported" -) -@pytest.mark.notyet( - ["trino"], - raises=TrinoUserError, - reason="connector does not support modifying table rows", -) +@NO_MERGE_SUPPORT @pytest.mark.parametrize("with_order_by", [True, False]) def test_upsert_from_expr( backend, con, employee_data_1_temp_table, employee_data_3_temp_table, with_order_by From 61c9cafc40b32879e28d152195589b1455391c3d Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 26 Oct 2025 08:41:14 -0600 Subject: [PATCH 17/23] test(api): check upsert from memtable with schemas --- ibis/backends/tests/test_client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 3053bdc0ba10..ba6cf50c17f3 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -702,6 +702,22 @@ def test_upsert_from_expr( ) +@NO_MERGE_SUPPORT +def test_upsert_from_memtable(con, temp_table): + t1 = ibis.memtable({"x": [1, 2, 3], "y": [4.0, 5.0, 6.0], "z": ["a", "b", "c"]}) + t2 = ibis.memtable({"x": [3, 2, 6], "y": [7.0, 8.0, 9.0], "z": ["d", "e", "f"]}) + table_name = temp_table + con.create_table(table_name, schema=t1.schema()) + con.upsert(table_name, t1, on="x") + con.upsert(table_name, t2, on="x") + + table = con.tables[table_name] + assert len(table.execute()) == 4 + assert con.tables[table_name].schema() == ibis.schema( + {"x": "int64", "y": "float64", "z": "string"} + ) + + @pytest.mark.notimpl( ["polars"], raises=AttributeError, reason="`insert` method not implemented" ) From a61e06a3ac666d7d2d8771e348ea383f5c2e1a4b Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 26 Oct 2025 21:54:27 -0600 Subject: [PATCH 18/23] test(api): check additional input schema scenarios --- ibis/backends/tests/test_client.py | 40 ++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index ba6cf50c17f3..571e1da4643e 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -703,19 +703,43 @@ def test_upsert_from_expr( @NO_MERGE_SUPPORT -def test_upsert_from_memtable(con, temp_table): +@pytest.mark.parametrize( + ("sch", "expectation"), + [ + ({"x": "int64", "y": "float64", "z": "string"}, contextlib.nullcontext()), + ({"z": "!string", "y": "float32", "x": "uint8"}, contextlib.nullcontext()), + ({"x": "int64"}, contextlib.nullcontext()), + ({"x": "int64", "z": "string"}, contextlib.nullcontext()), + ({"z": "string"}, contextlib.nullcontext()), + ], +) +def test_upsert_from_memtable(backend, con, temp_table, sch, expectation): t1 = ibis.memtable({"x": [1, 2, 3], "y": [4.0, 5.0, 6.0], "z": ["a", "b", "c"]}) - t2 = ibis.memtable({"x": [3, 2, 6], "y": [7.0, 8.0, 9.0], "z": ["d", "e", "f"]}) table_name = temp_table + + data = {"x": [3, 2, 6], "y": [7.0, 8.0, 9.0], "z": ["d", "e", "f"]} + t2 = ibis.memtable({k: v for k, v in data.items() if k in sch}, schema=sch) + con.create_table(table_name, schema=t1.schema()) con.upsert(table_name, t1, on="x") - con.upsert(table_name, t2, on="x") - table = con.tables[table_name] - assert len(table.execute()) == 4 - assert con.tables[table_name].schema() == ibis.schema( - {"x": "int64", "y": "float64", "z": "string"} - ) + with expectation: + con.upsert(table_name, t2, on="x") + + result = con.table(table_name).execute() + expected = ( + t2.execute() + .set_index("x") + .combine_first(t1.execute().set_index("x")) + .reset_index() + ) + assert len(result) == len(expected) + assert con.table(table_name).schema() == t1.schema() + backend.assert_frame_equal( + result.sort_values("x").reset_index(drop=True), + expected.sort_values("x").reset_index(drop=True), + check_dtype=False, # Expected schema checked on table above + ) @pytest.mark.notimpl( From 672711cce023ab970672fad0ba4c12ce0c912e5b Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 26 Oct 2025 22:22:35 -0600 Subject: [PATCH 19/23] test(api): add expected errs and fix memtable exec --- ibis/backends/tests/test_client.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 571e1da4643e..8e7e13d82a1a 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -708,37 +708,36 @@ def test_upsert_from_expr( [ ({"x": "int64", "y": "float64", "z": "string"}, contextlib.nullcontext()), ({"z": "!string", "y": "float32", "x": "uint8"}, contextlib.nullcontext()), - ({"x": "int64"}, contextlib.nullcontext()), + ({"x": "int64"}, pytest.raises(Exception)), # No cols to insert ({"x": "int64", "z": "string"}, contextlib.nullcontext()), - ({"z": "string"}, contextlib.nullcontext()), + ({"z": "string"}, pytest.raises(Exception)), # Missing `on` col ], ) def test_upsert_from_memtable(backend, con, temp_table, sch, expectation): t1 = ibis.memtable({"x": [1, 2, 3], "y": [4.0, 5.0, 6.0], "z": ["a", "b", "c"]}) table_name = temp_table - data = {"x": [3, 2, 6], "y": [7.0, 8.0, 9.0], "z": ["d", "e", "f"]} - t2 = ibis.memtable({k: v for k, v in data.items() if k in sch}, schema=sch) + data = { + k: v + for k, v in {"x": [3, 2, 6], "y": [7.0, 8.0, 9.0], "z": ["d", "e", "f"]}.items() + if k in sch + } + t2 = ibis.memtable(data, schema=sch) con.create_table(table_name, schema=t1.schema()) con.upsert(table_name, t1, on="x") + temporary = con.table(table_name) + df1 = temporary.execute().set_index("x") with expectation: con.upsert(table_name, t2, on="x") - result = con.table(table_name).execute() - expected = ( - t2.execute() - .set_index("x") - .combine_first(t1.execute().set_index("x")) - .reset_index() - ) + result = temporary.execute() + expected = pd.DataFrame(data).set_index("x").combine_first(df1).reset_index() assert len(result) == len(expected) - assert con.table(table_name).schema() == t1.schema() backend.assert_frame_equal( result.sort_values("x").reset_index(drop=True), expected.sort_values("x").reset_index(drop=True), - check_dtype=False, # Expected schema checked on table above ) From 886feed10081687e9ea58d852dafff45f63bd589 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 26 Oct 2025 22:58:43 -0600 Subject: [PATCH 20/23] test(api): don't use poorly-supported unsigned int --- ibis/backends/tests/test_client.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 8e7e13d82a1a..eeac3b9d09b7 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -703,11 +703,17 @@ def test_upsert_from_expr( @NO_MERGE_SUPPORT +@pytest.mark.notyet(["druid"], raises=NotImplementedError) +@pytest.mark.notimpl( + ["flink"], + raises=com.IbisError, + reason="`tbl_properties` is required when creating table with schema", +) @pytest.mark.parametrize( ("sch", "expectation"), [ ({"x": "int64", "y": "float64", "z": "string"}, contextlib.nullcontext()), - ({"z": "!string", "y": "float32", "x": "uint8"}, contextlib.nullcontext()), + ({"z": "!string", "y": "float32", "x": "int8"}, contextlib.nullcontext()), ({"x": "int64"}, pytest.raises(Exception)), # No cols to insert ({"x": "int64", "z": "string"}, contextlib.nullcontext()), ({"z": "string"}, pytest.raises(Exception)), # Missing `on` col From 223291e62423cc650190c36bdf4feaabef93336d Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 2 Nov 2025 07:05:28 -0700 Subject: [PATCH 21/23] chore(api): don't reimport pandas in test fixtures --- ibis/backends/tests/test_client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index eeac3b9d09b7..3be8a93856a4 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -494,8 +494,6 @@ def employee_data_1_temp_table(backend, con, test_employee_schema): @pytest.fixture def test_employee_data_2(): - import pandas as pd - df2 = pd.DataFrame( { "first_name": ["X", "Y", "Z"], @@ -522,8 +520,6 @@ def employee_data_2_temp_table( @pytest.fixture def test_employee_data_3(): - import pandas as pd - df3 = pd.DataFrame( { "first_name": ["B", "Y", "Z"], From c66f0eeabd905d78f28209dd6369fbdb9996bbab Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 2 Nov 2025 07:13:37 -0700 Subject: [PATCH 22/23] chore(mssql): always append semicolon to statement --- ibis/backends/mssql/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index cbed6c933e2a..4b95f1fe1d3a 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -424,8 +424,10 @@ def _safe_raw_sql(self, query, *args, **kwargs): with contextlib.suppress(AttributeError): query = query.sql(self.dialect) - if "MERGE" in query: - query = f"{query};" + # Although the semicolon isn't required for most statements, the + # T-SQL docs state that it will be required in a future version. + # https://learn.microsoft.com/en-us/sql/t-sql/language-elements/transact-sql-syntax-conventions-transact-sql?view=sql-server-ver17&tabs=code + query = f"{query};" with self.begin() as cur: cur.execute(query, *args, **kwargs) From 2ec6b058b01eaa50eda24752f09e29a6a734da42 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Fri, 21 Nov 2025 16:44:16 -0700 Subject: [PATCH 23/23] test(api): replace xfail in test body with a param Signed-off-by: Deepyaman Datta --- ibis/backends/tests/test_client.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index 3be8a93856a4..f3d0d015fab6 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -670,19 +670,26 @@ def test_upsert_from_dataframe( @NO_MERGE_SUPPORT -@pytest.mark.parametrize("with_order_by", [True, False]) +@pytest.mark.parametrize( + "with_order_by", + [ + pytest.param( + True, + marks=pytest.mark.notyet( + ["mssql"], + "MSSQL doesn't allow ORDER BY in subqueries, unless " + "TOP, OFFSET or FOR XML is also specified", + ), + ), + False, + ], +) def test_upsert_from_expr( backend, con, employee_data_1_temp_table, employee_data_3_temp_table, with_order_by ): temporary = con.table(employee_data_1_temp_table) from_table = con.table(employee_data_3_temp_table) if with_order_by: - if backend.name() == "mssql": - pytest.xfail( - "MSSQL doesn't allow ORDER BY in subqueries, unless " - "TOP, OFFSET or FOR XML is also specified" - ) - from_table = from_table.filter(ibis._.salary > 0).order_by("first_name") df1 = temporary.execute().set_index("first_name")