diff --git a/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java b/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java
new file mode 100644
index 000000000000..a27d06e7a1d7
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * An implementation of StagedTable that mimics the behavior of Spark's non-atomic CTAS and RTAS.
+ *
+ * A Spark catalog can implement StagingTableCatalog to support atomic operations by producing StagedTable. But if a
+ * catalog implements StagingTableCatalog, Spark expects the catalog to be able to produce a StagedTable for any table
+ * loaded by the catalog. This assumption doesn't always work, as in the case of {@link SparkSessionCatalog}, which
+ * supports atomic operations can produce a StagedTable for Iceberg tables, but wraps the session catalog and cannot
+ * necessarily produce a working StagedTable implementation for tables that it loads.
+ *
+ * The work-around is this class, which implements the StagedTable interface but does not have atomic behavior. Instead,
+ * the StagedTable interface is used to implement the behavior of the non-atomic SQL plans that will create a table,
+ * write, and will drop the table to roll back.
+ *
+ * This StagedTable implements SupportsRead, SupportsWrite, and SupportsDelete by passing the calls to the real table.
+ * Implementing those interfaces is safe because Spark will not use them unless the table supports them and returns the
+ * corresponding capabilities from {@link #capabilities()}.
+ */
+public class RollbackStagedTable implements StagedTable, SupportsRead, SupportsWrite, SupportsDelete {
+ private final TableCatalog catalog;
+ private final Identifier ident;
+ private final Table table;
+
+ public RollbackStagedTable(TableCatalog catalog, Identifier ident, Table table) {
+ this.catalog = catalog;
+ this.ident = ident;
+ this.table = table;
+ }
+
+ @Override
+ public void commitStagedChanges() {
+ // the changes have already been committed to the table at the end of the write
+ }
+
+ @Override
+ public void abortStagedChanges() {
+ // roll back changes by dropping the table
+ catalog.dropTable(ident);
+ }
+
+ @Override
+ public String name() {
+ return table.name();
+ }
+
+ @Override
+ public StructType schema() {
+ return table.schema();
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return table.partitioning();
+ }
+
+ @Override
+ public Map properties() {
+ return table.properties();
+ }
+
+ @Override
+ public Set capabilities() {
+ return table.capabilities();
+ }
+
+ @Override
+ public void deleteWhere(Filter[] filters) {
+ call(SupportsDelete.class, t -> t.deleteWhere(filters));
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ return callReturning(SupportsRead.class, t -> t.newScanBuilder(options));
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+ return callReturning(SupportsWrite.class, t -> t.newWriteBuilder(info));
+ }
+
+ private void call(Class extends T> requiredClass, Consumer task) {
+ callReturning(requiredClass, inst -> {
+ task.accept(inst);
+ return null;
+ });
+ }
+
+ private R callReturning(Class extends T> requiredClass, Function task) {
+ if (requiredClass.isInstance(table)) {
+ return task.apply(requiredClass.cast(table));
+ } else {
+ throw new UnsupportedOperationException(String.format(
+ "Table does not implement %s: %s (%s)",
+ requiredClass.getSimpleName(), table.name(), table.getClass().getName()));
+ }
+ }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 9079fae57ec8..ea0f0d940b30 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -28,6 +28,8 @@
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -42,14 +44,16 @@
* @param CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
*/
public class SparkSessionCatalog
- implements TableCatalog, SupportsNamespaces, CatalogExtension {
+ implements StagingTableCatalog, SupportsNamespaces, CatalogExtension {
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
private String catalogName = null;
private TableCatalog icebergCatalog = null;
+ private StagingTableCatalog asStagingCatalog = null;
private T sessionCatalog = null;
private boolean createParquetAsIceberg = false;
private boolean createAvroAsIceberg = false;
+ private boolean createOrcAsIceberg = false;
/**
* Build a {@link SparkCatalog} to be used for Iceberg operations.
@@ -121,18 +125,90 @@ public Table createTable(Identifier ident, StructType schema, Transform[] partit
Map properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = properties.get("provider");
- if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+ if (useIceberg(provider)) {
return icebergCatalog.createTable(ident, schema, partitions, properties);
+ } else {
+ // delegate to the session catalog
+ return sessionCatalog.createTable(ident, schema, partitions, properties);
+ }
+ }
- } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
- return icebergCatalog.createTable(ident, schema, partitions, properties);
+ @Override
+ public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] partitions,
+ Map properties)
+ throws TableAlreadyExistsException, NoSuchNamespaceException {
+ String provider = properties.get("provider");
+ TableCatalog catalog;
+ if (useIceberg(provider)) {
+ if (asStagingCatalog != null) {
+ return asStagingCatalog.stageCreate(ident, schema, partitions, properties);
+ }
+ catalog = icebergCatalog;
+ } else {
+ catalog = sessionCatalog;
+ }
- } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
- return icebergCatalog.createTable(ident, schema, partitions, properties);
+ // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+ Table table = catalog.createTable(ident, schema, partitions, properties);
+ return new RollbackStagedTable(catalog, ident, table);
+ }
+ @Override
+ public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] partitions,
+ Map properties)
+ throws NoSuchNamespaceException, NoSuchTableException {
+ String provider = properties.get("provider");
+ TableCatalog catalog;
+ if (useIceberg(provider)) {
+ if (asStagingCatalog != null) {
+ return asStagingCatalog.stageReplace(ident, schema, partitions, properties);
+ }
+ catalog = icebergCatalog;
} else {
- // delegate to the session catalog
- return sessionCatalog.createTable(ident, schema, partitions, properties);
+ catalog = sessionCatalog;
+ }
+
+ // attempt to drop the table and fail if it doesn't exist
+ if (!catalog.dropTable(ident)) {
+ throw new NoSuchTableException(ident);
+ }
+
+ try {
+ // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+ Table table = catalog.createTable(ident, schema, partitions, properties);
+ return new RollbackStagedTable(catalog, ident, table);
+
+ } catch (TableAlreadyExistsException e) {
+ // the table was deleted, but now already exists again. retry the replace.
+ return stageReplace(ident, schema, partitions, properties);
+ }
+ }
+
+ @Override
+ public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] partitions,
+ Map properties) throws NoSuchNamespaceException {
+ String provider = properties.get("provider");
+ TableCatalog catalog;
+ if (useIceberg(provider)) {
+ if (asStagingCatalog != null) {
+ return asStagingCatalog.stageCreateOrReplace(ident, schema, partitions, properties);
+ }
+ catalog = icebergCatalog;
+ } else {
+ catalog = sessionCatalog;
+ }
+
+ // drop the table if it exists
+ catalog.dropTable(ident);
+
+ try {
+ // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+ Table sessionCatalogTable = catalog.createTable(ident, schema, partitions, properties);
+ return new RollbackStagedTable(catalog, ident, sessionCatalogTable);
+
+ } catch (TableAlreadyExistsException e) {
+ // the table was deleted, but now already exists again. retry the replace.
+ return stageCreateOrReplace(ident, schema, partitions, properties);
}
}
@@ -167,8 +243,13 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
public final void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.icebergCatalog = buildSparkCatalog(name, options);
+ if (icebergCatalog instanceof StagingTableCatalog) {
+ this.asStagingCatalog = (StagingTableCatalog) icebergCatalog;
+ }
+
this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg);
this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
+ this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
}
@Override
@@ -185,4 +266,18 @@ public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
public String name() {
return catalogName;
}
+
+ private boolean useIceberg(String provider) {
+ if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+ return true;
+ } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
+ return true;
+ } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
+ return true;
+ } else if (createOrcAsIceberg && "orc".equalsIgnoreCase(provider)) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 2bde262bad9e..d02b852bc993 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -108,20 +108,15 @@ public void testRTAS() {
"SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
"FROM %s ORDER BY 3, 1", tableName, sourceName);
- // spark_catalog does not use an atomic replace, so the table history and old spec is dropped
- // the other catalogs do use atomic replace, so the spec id is incremented
- boolean isAtomic = !"spark_catalog".equals(catalogName);
-
Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "part", Types.StringType.get())
);
- int specId = isAtomic ? 1 : 0;
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
.identity("part")
- .withSpecId(specId)
+ .withSpecId(1)
.build();
Table rtasTable = validationCatalog.loadTable(tableIdent);
@@ -138,7 +133,7 @@ public void testRTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));
Assert.assertEquals("Table should have expected snapshots",
- isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ 2, Iterables.size(rtasTable.snapshots()));
}
@Test
@@ -156,9 +151,6 @@ public void testCreateRTAS() {
"SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
"FROM %s ORDER BY 3, 1", tableName, sourceName);
- // spark_catalog does not use an atomic replace, so the table history is dropped
- boolean isAtomic = !"spark_catalog".equals(catalogName);
-
Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
@@ -184,7 +176,7 @@ public void testCreateRTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));
Assert.assertEquals("Table should have expected snapshots",
- isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ 2, Iterables.size(rtasTable.snapshots()));
}
@Test
@@ -226,20 +218,15 @@ public void testDataFrameV2Replace() throws Exception {
.using("iceberg")
.replace();
- // spark_catalog does not use an atomic replace, so the table history and old spec is dropped
- // the other catalogs do use atomic replace, so the spec id is incremented
- boolean isAtomic = !"spark_catalog".equals(catalogName);
-
Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "part", Types.StringType.get())
);
- int specId = isAtomic ? 1 : 0;
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
.identity("part")
- .withSpecId(specId)
+ .withSpecId(1)
.build();
Table rtasTable = validationCatalog.loadTable(tableIdent);
@@ -256,7 +243,7 @@ public void testDataFrameV2Replace() throws Exception {
sql("SELECT * FROM %s ORDER BY id", tableName));
Assert.assertEquals("Table should have expected snapshots",
- isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ 2, Iterables.size(rtasTable.snapshots()));
}
@Test
@@ -289,9 +276,6 @@ public void testDataFrameV2CreateOrReplace() {
.using("iceberg")
.createOrReplace();
- // spark_catalog does not use an atomic replace, so the table history is dropped
- boolean isAtomic = !"spark_catalog".equals(catalogName);
-
Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
@@ -317,6 +301,6 @@ public void testDataFrameV2CreateOrReplace() {
sql("SELECT * FROM %s ORDER BY id", tableName));
Assert.assertEquals("Table should have expected snapshots",
- isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
+ 2, Iterables.size(rtasTable.snapshots()));
}
}