Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions spark3/src/main/java/org/apache/iceberg/spark/RollbackStagedTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark;

import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* An implementation of StagedTable that mimics the behavior of Spark's non-atomic CTAS and RTAS.
* <p>
* A Spark catalog can implement StagingTableCatalog to support atomic operations by producing StagedTable. But if a
* catalog implements StagingTableCatalog, Spark expects the catalog to be able to produce a StagedTable for any table
* loaded by the catalog. This assumption doesn't always work, as in the case of {@link SparkSessionCatalog}, which
* supports atomic operations can produce a StagedTable for Iceberg tables, but wraps the session catalog and cannot
* necessarily produce a working StagedTable implementation for tables that it loads.
* <p>
* The work-around is this class, which implements the StagedTable interface but does not have atomic behavior. Instead,
* the StagedTable interface is used to implement the behavior of the non-atomic SQL plans that will create a table,
* write, and will drop the table to roll back.
* <p>
* This StagedTable implements SupportsRead, SupportsWrite, and SupportsDelete by passing the calls to the real table.
* Implementing those interfaces is safe because Spark will not use them unless the table supports them and returns the
* corresponding capabilities from {@link #capabilities()}.
*/
public class RollbackStagedTable implements StagedTable, SupportsRead, SupportsWrite, SupportsDelete {
private final TableCatalog catalog;
private final Identifier ident;
private final Table table;

public RollbackStagedTable(TableCatalog catalog, Identifier ident, Table table) {
this.catalog = catalog;
this.ident = ident;
this.table = table;
}

@Override
public void commitStagedChanges() {
// the changes have already been committed to the table at the end of the write
}

@Override
public void abortStagedChanges() {
// roll back changes by dropping the table
catalog.dropTable(ident);
}

@Override
public String name() {
return table.name();
}

@Override
public StructType schema() {
return table.schema();
}

@Override
public Transform[] partitioning() {
return table.partitioning();
}

@Override
public Map<String, String> properties() {
return table.properties();
}

@Override
public Set<TableCapability> capabilities() {
return table.capabilities();
}

@Override
public void deleteWhere(Filter[] filters) {
call(SupportsDelete.class, t -> t.deleteWhere(filters));
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return callReturning(SupportsRead.class, t -> t.newScanBuilder(options));
}

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return callReturning(SupportsWrite.class, t -> t.newWriteBuilder(info));
}

private <T> void call(Class<? extends T> requiredClass, Consumer<T> task) {
callReturning(requiredClass, inst -> {
task.accept(inst);
return null;
});
}

private <T, R> R callReturning(Class<? extends T> requiredClass, Function<T, R> task) {
if (requiredClass.isInstance(table)) {
return task.apply(requiredClass.cast(table));
} else {
throw new UnsupportedOperationException(String.format(
"Table does not implement %s: %s (%s)",
requiredClass.getSimpleName(), table.name(), table.getClass().getName()));
}
}
}
111 changes: 103 additions & 8 deletions spark3/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -42,14 +44,16 @@
* @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
*/
public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
implements TableCatalog, SupportsNamespaces, CatalogExtension {
implements StagingTableCatalog, SupportsNamespaces, CatalogExtension {
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};

private String catalogName = null;
private TableCatalog icebergCatalog = null;
private StagingTableCatalog asStagingCatalog = null;
private T sessionCatalog = null;
private boolean createParquetAsIceberg = false;
private boolean createAvroAsIceberg = false;
private boolean createOrcAsIceberg = false;

/**
* Build a {@link SparkCatalog} to be used for Iceberg operations.
Expand Down Expand Up @@ -121,18 +125,90 @@ public Table createTable(Identifier ident, StructType schema, Transform[] partit
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = properties.get("provider");
if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
if (useIceberg(provider)) {
return icebergCatalog.createTable(ident, schema, partitions, properties);
} else {
// delegate to the session catalog
return sessionCatalog.createTable(ident, schema, partitions, properties);
}
}

} else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
return icebergCatalog.createTable(ident, schema, partitions, properties);
@Override
public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] partitions,
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
String provider = properties.get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageCreate(ident, schema, partitions, properties);
}
catalog = icebergCatalog;
} else {
catalog = sessionCatalog;
}

} else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
return icebergCatalog.createTable(ident, schema, partitions, properties);
// create the table with the session catalog, then wrap it in a staged table that will delete to roll back
Table table = catalog.createTable(ident, schema, partitions, properties);
return new RollbackStagedTable(catalog, ident, table);
}

@Override
public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] partitions,
Map<String, String> properties)
throws NoSuchNamespaceException, NoSuchTableException {
String provider = properties.get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageReplace(ident, schema, partitions, properties);
}
catalog = icebergCatalog;
} else {
// delegate to the session catalog
return sessionCatalog.createTable(ident, schema, partitions, properties);
catalog = sessionCatalog;
}

// attempt to drop the table and fail if it doesn't exist
if (!catalog.dropTable(ident)) {
throw new NoSuchTableException(ident);
}

try {
// create the table with the session catalog, then wrap it in a staged table that will delete to roll back
Table table = catalog.createTable(ident, schema, partitions, properties);
return new RollbackStagedTable(catalog, ident, table);

} catch (TableAlreadyExistsException e) {
// the table was deleted, but now already exists again. retry the replace.
return stageReplace(ident, schema, partitions, properties);
}
}

@Override
public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] partitions,
Map<String, String> properties) throws NoSuchNamespaceException {
String provider = properties.get("provider");
TableCatalog catalog;
if (useIceberg(provider)) {
if (asStagingCatalog != null) {
return asStagingCatalog.stageCreateOrReplace(ident, schema, partitions, properties);
}
catalog = icebergCatalog;
} else {
catalog = sessionCatalog;
}

// drop the table if it exists
catalog.dropTable(ident);

try {
// create the table with the session catalog, then wrap it in a staged table that will delete to roll back
Table sessionCatalogTable = catalog.createTable(ident, schema, partitions, properties);
return new RollbackStagedTable(catalog, ident, sessionCatalogTable);

} catch (TableAlreadyExistsException e) {
// the table was deleted, but now already exists again. retry the replace.
return stageCreateOrReplace(ident, schema, partitions, properties);
}
}

Expand Down Expand Up @@ -167,8 +243,13 @@ public void renameTable(Identifier from, Identifier to) throws NoSuchTableExcept
public final void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.icebergCatalog = buildSparkCatalog(name, options);
if (icebergCatalog instanceof StagingTableCatalog) {
this.asStagingCatalog = (StagingTableCatalog) icebergCatalog;
}

this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg);
this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
}

@Override
Expand All @@ -185,4 +266,18 @@ public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
public String name() {
return catalogName;
}

private boolean useIceberg(String provider) {
if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
return true;
} else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
return true;
} else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
return true;
} else if (createOrcAsIceberg && "orc".equalsIgnoreCase(provider)) {
return true;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,15 @@ public void testRTAS() {
"SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
"FROM %s ORDER BY 3, 1", tableName, sourceName);

// spark_catalog does not use an atomic replace, so the table history and old spec is dropped
// the other catalogs do use atomic replace, so the spec id is incremented
boolean isAtomic = !"spark_catalog".equals(catalogName);

Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "part", Types.StringType.get())
);

int specId = isAtomic ? 1 : 0;
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
.identity("part")
.withSpecId(specId)
.withSpecId(1)
.build();

Table rtasTable = validationCatalog.loadTable(tableIdent);
Expand All @@ -138,7 +133,7 @@ public void testRTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));

Assert.assertEquals("Table should have expected snapshots",
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
2, Iterables.size(rtasTable.snapshots()));
}

@Test
Expand All @@ -156,9 +151,6 @@ public void testCreateRTAS() {
"SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " +
"FROM %s ORDER BY 3, 1", tableName, sourceName);

// spark_catalog does not use an atomic replace, so the table history is dropped
boolean isAtomic = !"spark_catalog".equals(catalogName);

Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Expand All @@ -184,7 +176,7 @@ public void testCreateRTAS() {
sql("SELECT * FROM %s ORDER BY id", tableName));

Assert.assertEquals("Table should have expected snapshots",
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
2, Iterables.size(rtasTable.snapshots()));
}

@Test
Expand Down Expand Up @@ -226,20 +218,15 @@ public void testDataFrameV2Replace() throws Exception {
.using("iceberg")
.replace();

// spark_catalog does not use an atomic replace, so the table history and old spec is dropped
// the other catalogs do use atomic replace, so the spec id is incremented
boolean isAtomic = !"spark_catalog".equals(catalogName);

Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "part", Types.StringType.get())
);

int specId = isAtomic ? 1 : 0;
PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema)
.identity("part")
.withSpecId(specId)
.withSpecId(1)
.build();

Table rtasTable = validationCatalog.loadTable(tableIdent);
Expand All @@ -256,7 +243,7 @@ public void testDataFrameV2Replace() throws Exception {
sql("SELECT * FROM %s ORDER BY id", tableName));

Assert.assertEquals("Table should have expected snapshots",
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
2, Iterables.size(rtasTable.snapshots()));
}

@Test
Expand Down Expand Up @@ -289,9 +276,6 @@ public void testDataFrameV2CreateOrReplace() {
.using("iceberg")
.createOrReplace();

// spark_catalog does not use an atomic replace, so the table history is dropped
boolean isAtomic = !"spark_catalog".equals(catalogName);

Schema expectedSchema = new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
Expand All @@ -317,6 +301,6 @@ public void testDataFrameV2CreateOrReplace() {
sql("SELECT * FROM %s ORDER BY id", tableName));

Assert.assertEquals("Table should have expected snapshots",
isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots()));
2, Iterables.size(rtasTable.snapshots()));
}
}