Skip to content
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
23b1e1a
feat(api): implement `upsert()` using `MERGE INTO`
deepyaman Sep 16, 2025
14454d1
fix(oracle): attempt to support `MERGE` on backend
deepyaman Sep 17, 2025
8f3c7e2
fix(api): ensure DuckDB still works in "hacky" way
deepyaman Sep 17, 2025
80d2a9c
fix(mssql): attempt to support `MERGE` for backend
deepyaman Sep 18, 2025
aceca22
chore(oracle): remove hack, fixed in SQLGlot 27.16
deepyaman Sep 18, 2025
b452f59
chore(mssql): only add semicolon for MERGE queries
deepyaman Sep 18, 2025
be61603
fix(trino): make SQL generation work (but not run)
deepyaman Sep 20, 2025
71984eb
test(pyspark): specify expected failures for merge
deepyaman Sep 20, 2025
7098a8d
test(pyspark): don't importorskip for all backends
deepyaman Sep 20, 2025
a655828
refactor(backends): make qualifying cols an option
deepyaman Sep 20, 2025
93d3a0e
test(backends): xfail where MERGE INTO unsupported
deepyaman Sep 20, 2025
e864a7c
refactor(api): move duplicated logic into function
deepyaman Oct 15, 2025
130a8bb
test(api): add upsert from more complex expression
deepyaman Oct 15, 2025
39bc47d
chore(api): don't qualify `MERGE` target col names
deepyaman Oct 15, 2025
b3c37dc
test(api): expect failure using MSSQL and ORDER BY
deepyaman Oct 15, 2025
7f5001f
refactor(api): combine marks as `NO_MERGE_SUPPORT`
deepyaman Oct 16, 2025
61c9caf
test(api): check upsert from memtable with schemas
deepyaman Oct 26, 2025
a61e06a
test(api): check additional input schema scenarios
deepyaman Oct 27, 2025
672711c
test(api): add expected errs and fix memtable exec
deepyaman Oct 27, 2025
886feed
test(api): don't use poorly-supported unsigned int
deepyaman Oct 27, 2025
223291e
chore(api): don't reimport pandas in test fixtures
deepyaman Nov 2, 2025
c66f0ee
chore(mssql): always append semicolon to statement
deepyaman Nov 2, 2025
856b4d6
Merge branch 'main' into feat/api/backend-upsert
deepyaman Nov 2, 2025
7d65486
Merge branch 'main' into feat/api/backend-upsert
deepyaman Nov 14, 2025
9624f44
Merge branch 'main' into feat/api/backend-upsert
deepyaman Nov 17, 2025
2ec6b05
test(api): replace xfail in test body with a param
deepyaman Nov 21, 2025
47af4fb
Merge branch 'main' into feat/api/backend-upsert
deepyaman Nov 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 123 additions & 5 deletions ibis/backends/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def insert(
Parameters
----------
name
The name of the table to which data needs will be inserted
The name of the table to which data will be inserted
obj
The source data or expression to insert
database
Expand Down Expand Up @@ -453,22 +453,30 @@ def insert(
with self._safe_raw_sql(query):
pass

def _build_insert_from_table(
def _get_columns_to_insert(
self, *, target: str, source, db: str | None = None, catalog: str | None = None
):
compiler = self.compiler
quoted = compiler.quoted
# Compare the columns between the target table and the object to be inserted
# If source is a subset of target, use source columns for insert list
# Otherwise, assume auto-generated column names and use positional ordering.
target_cols = self.get_schema(target, catalog=catalog, database=db).keys()

columns = (
return (
source_cols
if (source_cols := source.schema().keys()) <= target_cols
else target_cols
)

def _build_insert_from_table(
self, *, target: str, source, db: str | None = None, catalog: str | None = None
):
compiler = self.compiler
quoted = compiler.quoted

columns = self._get_columns_to_insert(
target=target, source=source, db=db, catalog=catalog
)

query = sge.insert(
expression=self.compile(source),
into=sg.table(target, db=db, catalog=catalog, quoted=quoted),
Expand Down Expand Up @@ -526,6 +534,116 @@ def _build_insert_template(
),
).sql(self.dialect)

def upsert(
self,
name: str,
/,
obj: pd.DataFrame | ir.Table | list | dict,
on: str,
*,
database: str | None = None,
) -> None:
"""Upsert data into a table.

::: {.callout-note}
## Ibis does not use the word `schema` to refer to database hierarchy.

A collection of `table` is referred to as a `database`.
A collection of `database` is referred to as a `catalog`.

These terms are mapped onto the corresponding features in each
backend (where available), regardless of whether the backend itself
uses the same terminology.
:::

Parameters
----------
name
The name of the table to which data will be upserted
obj
The source data or expression to upsert
on
Column name to join on
database
Name of the attached database that the table is located in.

For backends that support multi-level table hierarchies, you can
pass in a dotted string path like `"catalog.database"` or a tuple of
strings like `("catalog", "database")`.
"""
table_loc = self._to_sqlglot_table(database)
catalog, db = self._to_catalog_db_tuple(table_loc)

if not isinstance(obj, ir.Table):
obj = ibis.memtable(obj)

self._run_pre_execute_hooks(obj)

query = self._build_upsert_from_table(
target=name, source=obj, on=on, db=db, catalog=catalog
)

with self._safe_raw_sql(query):
pass

def _build_upsert_from_table(
self,
*,
target: str,
source,
on: str,
db: str | None = None,
catalog: str | None = None,
):
compiler = self.compiler
quoted = compiler.quoted

columns = self._get_columns_to_insert(
target=target, source=source, db=db, catalog=catalog
)

source_alias = util.gen_name("source")
target_alias = util.gen_name("target")
query = sge.merge(
sge.When(
matched=True,
then=sge.Update(
expressions=[
sg.column(col, quoted=quoted).eq(
sg.column(col, table=source_alias, quoted=quoted)
)
for col in columns
if col != on
]
),
),
sge.When(
matched=False,
then=sge.Insert(
this=sge.Tuple(
expressions=[sg.column(col, quoted=quoted) for col in columns]
),
expression=sge.Tuple(
expressions=[
sg.column(col, table=source_alias, quoted=quoted)
for col in columns
]
),
),
),
into=sg.table(target, db=db, catalog=catalog, quoted=quoted).as_(
sg.to_identifier(target_alias, quoted=quoted), table=True
),
using=f"({self.compile(source)}) AS {sg.to_identifier(source_alias, quoted=quoted)}",
on=sge.Paren(
this=sg.column(on, table=target_alias, quoted=quoted).eq(
sg.column(on, table=source_alias, quoted=quoted)
)
),
dialect=compiler.dialect,
)
return query

def truncate_table(self, name: str, /, *, database: str | None = None) -> None:
"""Delete all rows from a table.

Expand Down
65 changes: 63 additions & 2 deletions ibis/backends/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from __future__ import annotations

import sqlite3

import pytest
from packaging.version import parse as vparse

import ibis.common.exceptions as com
from ibis.backends.tests.errors import MySQLOperationalError
from ibis.backends.tests.errors import (
ClickHouseDatabaseError,
ImpalaHiveServer2Error,
MySQLOperationalError,
MySQLProgrammingError,
PsycoPg2InternalError,
Py4JJavaError,
PySparkUnsupportedOperationException,
TrinoUserError,
)


def combine_marks(marks: list) -> callable:
Expand Down Expand Up @@ -50,7 +62,6 @@ def decorator(func):
]
NO_ARRAY_SUPPORT = combine_marks(NO_ARRAY_SUPPORT_MARKS)


NO_STRUCT_SUPPORT_MARKS = [
pytest.mark.never(["mysql", "sqlite", "mssql"], reason="No struct support"),
pytest.mark.notyet(["impala"]),
Expand Down Expand Up @@ -78,3 +89,53 @@ def decorator(func):
pytest.mark.notimpl(["datafusion", "exasol", "mssql", "druid", "oracle"]),
]
NO_JSON_SUPPORT = combine_marks(NO_JSON_SUPPORT_MARKS)

try:
import pyspark

pyspark_merge_exception = (
PySparkUnsupportedOperationException
if vparse(pyspark.__version__) >= vparse("3.5")
else Py4JJavaError
)
except ImportError:
pyspark_merge_exception = None

NO_MERGE_SUPPORT_MARKS = [
pytest.mark.notyet(
["clickhouse"],
raises=ClickHouseDatabaseError,
reason="MERGE INTO is not supported",
),
pytest.mark.notyet(["datafusion"], reason="MERGE INTO is not supported"),
pytest.mark.notyet(
["impala"],
raises=ImpalaHiveServer2Error,
reason="target table must be an Iceberg table",
),
pytest.mark.notyet(
["mysql"], raises=MySQLProgrammingError, reason="MERGE INTO is not supported"
),
pytest.mark.notimpl(["polars"], reason="`upsert` method not implemented"),
pytest.mark.notyet(
["pyspark"],
raises=pyspark_merge_exception,
reason="MERGE INTO TABLE is not supported temporarily",
),
pytest.mark.notyet(
["risingwave"],
raises=PsycoPg2InternalError,
reason="MERGE INTO is not supported",
),
pytest.mark.notyet(
["sqlite"],
raises=sqlite3.OperationalError,
reason="MERGE INTO is not supported",
),
pytest.mark.notyet(
["trino"],
raises=TrinoUserError,
reason="connector does not support modifying table rows",
),
]
NO_MERGE_SUPPORT = combine_marks(NO_MERGE_SUPPORT_MARKS)
7 changes: 6 additions & 1 deletion ibis/backends/tests/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@
from pyspark.errors.exceptions.base import ParseException as PySparkParseException
from pyspark.errors.exceptions.base import PySparkValueError
from pyspark.errors.exceptions.base import PythonException as PySparkPythonException
from pyspark.errors.exceptions.base import (
UnsupportedOperationException as PySparkUnsupportedOperationException,
)
from pyspark.errors.exceptions.connect import (
SparkConnectGrpcException as PySparkConnectGrpcException,
)
except ImportError:
PySparkParseException = PySparkAnalysisException = PySparkArithmeticException = (
PySparkPythonException
) = PySparkConnectGrpcException = PySparkValueError = None
) = PySparkUnsupportedOperationException = PySparkConnectGrpcException = (
PySparkValueError
) = None

try:
from google.api_core.exceptions import BadRequest as GoogleBadRequest
Expand Down
Loading