diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 9c3c7a9467fc..6afc3528a302 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -110,9 +110,17 @@ public Transaction newReplaceTableTransaction( throw new NoSuchTableException("No such table: " + identifier); } - String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); Map tableProperties = properties != null ? properties : Maps.newHashMap(); - TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties); + + TableMetadata metadata; + if (ops.current() != null) { + String baseLocation = location != null ? location : ops.current().location(); + metadata = ops.current().buildReplacement(schema, spec, baseLocation, tableProperties); + } else { + String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); + metadata = TableMetadata.newTableMetadata(schema, spec, baseLocation, tableProperties); + } + if (orCreate) { return Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, metadata); } else { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 8501aa9a0fbc..b7fb7cb5a3d3 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -566,7 +566,7 @@ public TableMetadata removeSnapshotLogEntries(Set snapshotIds) { // The caller is responsible to pass a updatedPartitionSpec with correct partition field IDs public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec updatedPartitionSpec, - Map updatedProperties) { + String newLocation, Map updatedProperties) { ValidationException.check(formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), "Spec does not use sequential IDs that are required in v1: %s", updatedPartitionSpec); @@ -602,7 +602,7 @@ public TableMetadata buildReplacement(Schema updatedSchema, PartitionSpec update newProperties.putAll(this.properties); newProperties.putAll(updatedProperties); - return new TableMetadata(null, formatVersion, uuid, location, + return new TableMetadata(null, formatVersion, uuid, newLocation, lastSequenceNumber, System.currentTimeMillis(), nextLastColumnId.get(), freshSchema, specId, builder.build(), ImmutableMap.copyOf(newProperties), -1, snapshots, ImmutableList.of(), addPreviousFile(file, lastUpdatedMillis, newProperties)); diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 8961820d7675..b0057cd9b709 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -78,7 +78,7 @@ public static Transaction beginReplace(File temp, String name, Schema schema, Pa TableMetadata metadata; if (current != null) { - metadata = current.buildReplacement(schema, spec, properties); + metadata = current.buildReplacement(schema, spec, current.location(), properties); return Transactions.replaceTableTransaction(name, ops, metadata); } else { metadata = newTableMetadata(schema, spec, temp.toString(), properties); diff --git a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 3fa0499060f2..e7ef9bffdb2d 100644 --- a/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -153,10 +153,12 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { } @Override - public void renameTable(TableIdentifier from, TableIdentifier to) { + public void renameTable(TableIdentifier from, TableIdentifier originalTo) { if (!isValidIdentifier(from)) { throw new NoSuchTableException("Invalid identifier: %s", from); } + + TableIdentifier to = removeCatalogName(originalTo); Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to); String toDatabase = to.namespace().level(0); @@ -347,6 +349,20 @@ protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { return tableIdentifier.namespace().levels().length == 1; } + private TableIdentifier removeCatalogName(TableIdentifier to) { + if (isValidIdentifier(to)) { + return to; + } + + // check if the identifier includes the catalog name and remove it + if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) { + return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name()); + } + + // return the original unmodified + return to; + } + private boolean isValidateNamespace(Namespace namespace) { return namespace.levels().length == 1; } diff --git a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java index b714605f130f..93ba58d6e470 100644 --- a/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java +++ b/hive/src/test/java/org/apache/iceberg/hive/HiveCreateReplaceTableTest.java @@ -155,7 +155,7 @@ public void testReplaceTableTxn() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - Assert.assertEquals("Partition spec should match", PartitionSpec.unpartitioned(), table.spec()); + Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size()); } @Test @@ -233,7 +233,7 @@ public void testCreateOrReplaceTableTxnTableExists() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - Assert.assertEquals("Partition spec should match", PartitionSpec.unpartitioned(), table.spec()); + Assert.assertEquals("Partition spec should be unpartitioned", 0, table.spec().fields().size()); } @Test diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 3cf279f3a662..e66f3a48240a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -26,15 +26,20 @@ import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; public class SparkTestBase { + protected static final Object ANY = new Object(); + private static TestHiveMetastore metastore = null; private static HiveConf hiveConf = null; protected static SparkSession spark = null; @@ -48,6 +53,7 @@ public static void startMetastoreAndSpark() { SparkTestBase.spark = SparkSession.builder() .master("local[2]") + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) .enableHiveSupport() .getOrCreate(); @@ -65,7 +71,7 @@ public static void stopMetastoreAndSpark() { SparkTestBase.spark = null; } - protected List sql(String query, Object... args) { + protected List sql(String query, Object... args) { List rows = spark.sql(String.format(query, args)).collectAsList(); if (rows.size() < 1) { return ImmutableList.of(); @@ -73,11 +79,38 @@ protected List sql(String query, Object... args) { return rows.stream() .map(row -> IntStream.range(0, row.size()) - .mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos).toString()) - .toArray(String[]::new) + .mapToObj(pos -> row.isNullAt(pos) ? null : row.get(pos)) + .toArray(Object[]::new) ).collect(Collectors.toList()); } + protected Object scalarSql(String query, Object... args) { + List rows = sql(query, args); + Assert.assertEquals("Scalar SQL should return one row", 1, rows.size()); + Object[] row = Iterables.getOnlyElement(rows); + Assert.assertEquals("Scalar SQL should return one value", 1, row.length); + return row[0]; + } + + protected Object[] row(Object... values) { + return values; + } + + protected void assertEquals(String context, List expectedRows, List actualRows) { + Assert.assertEquals(context + ": number of results should match", expectedRows.size(), actualRows.size()); + for (int row = 0; row < expectedRows.size(); row += 1) { + Object[] expected = expectedRows.get(row); + Object[] actual = actualRows.get(row); + Assert.assertEquals("Number of columns should match", expected.length, actual.length); + for (int col = 0; col < actualRows.get(row).length; col += 1) { + if (expected[col] != ANY) { + Assert.assertEquals(context + ": row " + row + " col " + col + " contents should match", + expected[col], actual[col]); + } + } + } + } + protected static String dbPath(String dbName) { return metastore.getDatabasePath(dbName); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java index 76cd5cadcfd8..c8b7a31b3ba0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java @@ -28,7 +28,7 @@ public class SimpleRecord { public SimpleRecord() { } - SimpleRecord(Integer id, String data) { + public SimpleRecord(Integer id, String data) { this.id = id; this.data = data; } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java index 4013cfc41e33..5b8b2b31e548 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/SparkCatalogTestBase.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.util.Map; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.junit.AfterClass; @@ -76,6 +78,8 @@ public static Object[][] parameters() { protected final String catalogName; protected final Catalog validationCatalog; protected final SupportsNamespaces validationNamespaceCatalog; + protected final TableIdentifier tableIdent = TableIdentifier.of(Namespace.of("default"), "table"); + protected final String tableName; public SparkCatalogTestBase(String catalogName, String implementation, Map config) { this.catalogName = catalogName; @@ -90,5 +94,13 @@ public SparkCatalogTestBase(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void createTable() { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS %s2", tableName); + } + + @Test + public void testAddColumn() { + sql("ALTER TABLE %s ADD COLUMN point struct AFTER id", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(3, "point", Types.StructType.of( + NestedField.required(4, "x", Types.DoubleType.get()), + NestedField.required(5, "y", Types.DoubleType.get()) + )), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + + sql("ALTER TABLE %s ADD COLUMN point.z double COMMENT 'May be null' FIRST", tableName); + + Types.StructType expectedSchema2 = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(3, "point", Types.StructType.of( + NestedField.optional(6, "z", Types.DoubleType.get(), "May be null"), + NestedField.required(4, "x", Types.DoubleType.get()), + NestedField.required(5, "y", Types.DoubleType.get()) + )), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema2, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testDropColumn() { + sql("ALTER TABLE %s DROP COLUMN data", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testRenameColumn() { + sql("ALTER TABLE %s RENAME COLUMN id TO row_id", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "row_id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testAlterColumnComment() { + sql("ALTER TABLE %s ALTER COLUMN id COMMENT 'Record id'", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get(), "Record id"), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testAlterColumnType() { + sql("ALTER TABLE %s ADD COLUMN count int", tableName); + sql("ALTER TABLE %s ALTER COLUMN count TYPE bigint", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get()), + NestedField.optional(3, "count", Types.LongType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testAlterColumnDropNotNull() { + sql("ALTER TABLE %s ALTER COLUMN id DROP NOT NULL", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.optional(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testAlterColumnSetNotNull() { + // no-op changes are allowed + sql("ALTER TABLE %s ALTER COLUMN id SET NOT NULL", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + + AssertHelpers.assertThrows("Should reject adding NOT NULL constraint to an optional column", + AnalysisException.class, "Cannot change nullable column to non-nullable: data", + () -> sql("ALTER TABLE %s ALTER COLUMN data SET NOT NULL", tableName)); + } + + @Test + public void testAlterColumnPositionAfter() { + sql("ALTER TABLE %s ADD COLUMN count int", tableName); + sql("ALTER TABLE %s ALTER COLUMN count AFTER id", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(3, "count", Types.IntegerType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testAlterColumnPositionFirst() { + sql("ALTER TABLE %s ADD COLUMN count int", tableName); + sql("ALTER TABLE %s ALTER COLUMN count FIRST", tableName); + + Types.StructType expectedSchema = Types.StructType.of( + NestedField.optional(3, "count", Types.IntegerType.get()), + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + + Assert.assertEquals("Schema should match expected", + expectedSchema, validationCatalog.loadTable(tableIdent).schema().asStruct()); + } + + @Test + public void testTableRename() { + Assume.assumeFalse("Hadoop catalog does not support rename", validationCatalog instanceof HadoopCatalog); + + Assert.assertTrue("Initial name should exist", validationCatalog.tableExists(tableIdent)); + Assert.assertFalse("New name should not exist", validationCatalog.tableExists(renamedIdent)); + + sql("ALTER TABLE %s RENAME TO %s2", tableName, tableName); + + Assert.assertFalse("Initial name should not exist", validationCatalog.tableExists(tableIdent)); + Assert.assertTrue("New name should exist", validationCatalog.tableExists(renamedIdent)); + } + + @Test + public void testSetTableProperties() { + sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName); + + Assert.assertEquals("Should have the new table property", + "value", validationCatalog.loadTable(tableIdent).properties().get("prop")); + + sql("ALTER TABLE %s UNSET TBLPROPERTIES ('prop')", tableName); + + Assert.assertNull("Should not have the removed table property", + validationCatalog.loadTable(tableIdent).properties().get("prop")); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java new file mode 100644 index 000000000000..9d8711c24403 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.io.File; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +public class TestCreateTable extends SparkCatalogTestBase { + public TestCreateTable(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void dropTestTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateTable() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertNull("Should not have the default format set", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + } + + @Test + public void testCreateTableUsingParquet() { + Assume.assumeTrue( + "Not working with session catalog because Spark will not use v2 for a Parquet table", + !"spark_catalog".equals(catalogName)); + + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING parquet", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertEquals("Should not have default format parquet", + "parquet", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + + AssertHelpers.assertThrows("Should reject unsupported format names", + IllegalArgumentException.class, "Unsupported format in USING: crocodile", + () -> sql("CREATE TABLE %s.default.fail (id BIGINT NOT NULL, data STRING) USING crocodile", catalogName)); + } + + @Test + public void testCreateTablePartitionedBy() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, created_at TIMESTAMP, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (category, bucket(8, id), days(created_at))", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "created_at", Types.TimestampType.withZone()), + NestedField.optional(3, "category", Types.StringType.get()), + NestedField.optional(4, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + + PartitionSpec expectedSpec = PartitionSpec.builderFor(new Schema(expectedSchema.fields())) + .identity("category") + .bucket("id", 8) + .day("created_at") + .build(); + Assert.assertEquals("Should be partitioned correctly", expectedSpec, table.spec()); + + Assert.assertNull("Should not have the default format set", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + } + + @Test + public void testCreateTableColumnComments() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL COMMENT 'Unique identifier', data STRING COMMENT 'Data value') " + + "USING iceberg", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get(), "Unique identifier"), + NestedField.optional(2, "data", Types.StringType.get(), "Data value")); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertNull("Should not have the default format set", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + } + + @Test + public void testCreateTableComment() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "COMMENT 'Table doc'", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertNull("Should not have the default format set", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + Assert.assertEquals("Should have the table comment set in properties", + "Table doc", table.properties().get(TableCatalog.PROP_COMMENT)); + } + + @Test + public void testCreateTableLocation() throws Exception { + Assume.assumeTrue( + "Cannot set custom locations for Hadoop catalog tables", + !(validationCatalog instanceof HadoopCatalog)); + + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + File tableLocation = temp.newFolder(); + Assert.assertTrue(tableLocation.delete()); + + String location = "file:" + tableLocation.toString(); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "LOCATION '%s'", + tableName, location); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertNull("Should not have the default format set", + table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); + Assert.assertEquals("Should have a custom table location", + location, table.location()); + } + + @Test + public void testCreateTableProperties() { + Assert.assertFalse("Table should not already exist", validationCatalog.tableExists(tableIdent)); + + sql("CREATE TABLE %s " + + "(id BIGINT NOT NULL, data STRING) " + + "USING iceberg " + + "TBLPROPERTIES (p1=2, p2='x')", + tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertNotNull("Should load the new table", table); + + StructType expectedSchema = StructType.of( + NestedField.required(1, "id", Types.LongType.get()), + NestedField.optional(2, "data", Types.StringType.get())); + Assert.assertEquals("Should have the expected schema", expectedSchema, table.schema().asStruct()); + Assert.assertEquals("Should not be partitioned", 0, table.spec().fields().size()); + Assert.assertEquals("Should have property p1", "2", table.properties().get("p1")); + Assert.assertEquals("Should have property p2", "x", table.properties().get("p2")); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java new file mode 100644 index 000000000000..2bde262bad9e --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.when; + +public class TestCreateTableAsSelect extends SparkCatalogTestBase { + + private final String sourceName; + + public TestCreateTableAsSelect(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + this.sourceName = tableName("source"); + + sql("CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) " + + "USING iceberg PARTITIONED BY (truncate(id, 3))", sourceName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testUnpartitionedCTAS() { + sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, sourceName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + Table ctasTable = validationCatalog.loadTable(tableIdent); + + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), ctasTable.schema().asStruct()); + Assert.assertEquals("Should be an unpartitioned table", + 0, ctasTable.spec().fields().size()); + assertEquals("Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testPartitionedCTAS() { + sql("CREATE TABLE %s USING iceberg PARTITIONED BY (id) AS SELECT * FROM %s ORDER BY id", tableName, sourceName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema) + .identity("id") + .build(); + + Table ctasTable = validationCatalog.loadTable(tableIdent); + + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), ctasTable.schema().asStruct()); + Assert.assertEquals("Should be partitioned by id", + expectedSpec, ctasTable.spec()); + assertEquals("Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testRTAS() { + sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", tableName, sourceName); + + assertEquals("Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " + + "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY 3, 1", tableName, sourceName); + + // spark_catalog does not use an atomic replace, so the table history and old spec is dropped + // the other catalogs do use atomic replace, so the spec id is incremented + boolean isAtomic = !"spark_catalog".equals(catalogName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "part", Types.StringType.get()) + ); + + int specId = isAtomic ? 1 : 0; + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema) + .identity("part") + .withSpecId(specId) + .build(); + + Table rtasTable = validationCatalog.loadTable(tableIdent); + + // the replacement table has a different schema and partition spec than the original + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), rtasTable.schema().asStruct()); + Assert.assertEquals("Should be partitioned by part", + expectedSpec, rtasTable.spec()); + + assertEquals("Should have rows matching the source table", + sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + Assert.assertEquals("Table should have expected snapshots", + isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots())); + } + + @Test + public void testCreateRTAS() { + sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " + + "SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY 3, 1", tableName, sourceName); + + assertEquals("Should have rows matching the source table", + sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + sql("CREATE OR REPLACE TABLE %s USING iceberg PARTITIONED BY (part) AS " + + "SELECT 2 * id as id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY 3, 1", tableName, sourceName); + + // spark_catalog does not use an atomic replace, so the table history is dropped + boolean isAtomic = !"spark_catalog".equals(catalogName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "part", Types.StringType.get()) + ); + + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema) + .identity("part") + .withSpecId(0) // the spec is identical and should be reused + .build(); + + Table rtasTable = validationCatalog.loadTable(tableIdent); + + // the replacement table has a different schema and partition spec than the original + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), rtasTable.schema().asStruct()); + Assert.assertEquals("Should be partitioned by part", + expectedSpec, rtasTable.spec()); + + assertEquals("Should have rows matching the source table", + sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + Assert.assertEquals("Table should have expected snapshots", + isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots())); + } + + @Test + public void testDataFrameV2Create() throws Exception { + spark.table(sourceName).writeTo(tableName).using("iceberg").create(); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + Table ctasTable = validationCatalog.loadTable(tableIdent); + + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), ctasTable.schema().asStruct()); + Assert.assertEquals("Should be an unpartitioned table", + 0, ctasTable.spec().fields().size()); + assertEquals("Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2Replace() throws Exception { + spark.table(sourceName).writeTo(tableName).using("iceberg").create(); + + assertEquals("Should have rows matching the source table", + sql("SELECT * FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + spark.table(sourceName) + .select( + col("id"), + col("data"), + when(col("id").mod(lit(2)).equalTo(lit(0)), lit("even")).otherwise("odd").as("part")) + .orderBy("part", "id") + .writeTo(tableName) + .partitionedBy(col("part")) + .using("iceberg") + .replace(); + + // spark_catalog does not use an atomic replace, so the table history and old spec is dropped + // the other catalogs do use atomic replace, so the spec id is incremented + boolean isAtomic = !"spark_catalog".equals(catalogName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "part", Types.StringType.get()) + ); + + int specId = isAtomic ? 1 : 0; + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema) + .identity("part") + .withSpecId(specId) + .build(); + + Table rtasTable = validationCatalog.loadTable(tableIdent); + + // the replacement table has a different schema and partition spec than the original + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), rtasTable.schema().asStruct()); + Assert.assertEquals("Should be partitioned by part", + expectedSpec, rtasTable.spec()); + + assertEquals("Should have rows matching the source table", + sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + Assert.assertEquals("Table should have expected snapshots", + isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots())); + } + + @Test + public void testDataFrameV2CreateOrReplace() { + spark.table(sourceName) + .select( + col("id"), + col("data"), + when(col("id").mod(lit(2)).equalTo(lit(0)), lit("even")).otherwise("odd").as("part")) + .orderBy("part", "id") + .writeTo(tableName) + .partitionedBy(col("part")) + .using("iceberg") + .createOrReplace(); + + assertEquals("Should have rows matching the source table", + sql("SELECT id, data, CASE WHEN (id %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + spark.table(sourceName) + .select(col("id").multiply(lit(2)).as("id"), col("data")) + .select( + col("id"), + col("data"), + when(col("id").mod(lit(2)).equalTo(lit(0)), lit("even")).otherwise("odd").as("part")) + .orderBy("part", "id") + .writeTo(tableName) + .partitionedBy(col("part")) + .using("iceberg") + .createOrReplace(); + + // spark_catalog does not use an atomic replace, so the table history is dropped + boolean isAtomic = !"spark_catalog".equals(catalogName); + + Schema expectedSchema = new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "part", Types.StringType.get()) + ); + + PartitionSpec expectedSpec = PartitionSpec.builderFor(expectedSchema) + .identity("part") + .withSpecId(0) // the spec is identical and should be reused + .build(); + + Table rtasTable = validationCatalog.loadTable(tableIdent); + + // the replacement table has a different schema and partition spec than the original + Assert.assertEquals("Should have expected nullable schema", + expectedSchema.asStruct(), rtasTable.schema().asStruct()); + Assert.assertEquals("Should be partitioned by part", + expectedSpec, rtasTable.spec()); + + assertEquals("Should have rows matching the source table", + sql("SELECT 2 * id, data, CASE WHEN ((2 * id) %% 2) = 0 THEN 'even' ELSE 'odd' END AS part " + + "FROM %s ORDER BY id", sourceName), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + Assert.assertEquals("Table should have expected snapshots", + isAtomic ? 2 : 1, Iterables.size(rtasTable.snapshots())); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java new file mode 100644 index 000000000000..3387d76b3e8a --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestDeleteFrom extends SparkCatalogTestBase { + public TestDeleteFrom(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testDeleteFromUnpartitionedTable() { + // set the shuffle partitions to 1 to force the write to use a single task and produce 1 file + String originalParallelism = spark.conf().get("spark.sql.shuffle.partitions"); + spark.conf().set("spark.sql.shuffle.partitions", "1"); + try { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); + + assertEquals("Should have expected rows", + ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + AssertHelpers.assertThrows("Should not delete when not all rows of a file match the filter", + IllegalArgumentException.class, "Failed to cleanly delete data files", + () -> sql("DELETE FROM %s WHERE id < 2", tableName)); + + sql("DELETE FROM %s WHERE id < 4", tableName); + + Assert.assertEquals("Should have no rows after successful delete", + 0L, scalarSql("SELECT count(1) FROM %s", tableName)); + + } finally { + spark.conf().set("spark.sql.shuffle.partitions", originalParallelism); + } + } + + @Test + public void testDeleteFromPartitionedTable() { + // set the shuffle partitions to 1 to force the write to use a single task and produce 1 file per partition + String originalParallelism = spark.conf().get("spark.sql.shuffle.partitions"); + spark.conf().set("spark.sql.shuffle.partitions", "1"); + try { + sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg " + + "PARTITIONED BY (truncate(id, 2))", tableName); + sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); + + assertEquals("Should have 3 rows in 2 partitions", + ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + AssertHelpers.assertThrows("Should not delete when not all rows of a file match the filter", + IllegalArgumentException.class, "Failed to cleanly delete data files", + () -> sql("DELETE FROM %s WHERE id > 2", tableName)); + + sql("DELETE FROM %s WHERE id < 2", tableName); + + assertEquals("Should have two rows in the second partition", + ImmutableList.of(row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s ORDER BY id", tableName)); + + } finally { + spark.conf().set("spark.sql.shuffle.partitions", originalParallelism); + } + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java index 1b1184501d47..d1eac312669a 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java @@ -69,7 +69,7 @@ public void testDefaultNamespace() { sql("USE %s", catalogName); - String[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE")); + Object[] current = Iterables.getOnlyElement(sql("SHOW CURRENT NAMESPACE")); Assert.assertEquals("Should use the current catalog", current[0], catalogName); Assert.assertEquals("Should use the configured default namespace", current[1], "default"); } @@ -114,12 +114,12 @@ public void testListTables() { Assert.assertTrue("Namespace should exist", validationNamespaceCatalog.namespaceExists(NS)); - List rows = sql("SHOW TABLES IN %s", fullNamespace); + List rows = sql("SHOW TABLES IN %s", fullNamespace); Assert.assertEquals("Should not list any tables", 0, rows.size()); sql("CREATE TABLE %s.table (id bigint) USING iceberg", fullNamespace); - String[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s", fullNamespace)); + Object[] row = Iterables.getOnlyElement(sql("SHOW TABLES IN %s", fullNamespace)); Assert.assertEquals("Namespace should match", "db", row[0]); Assert.assertEquals("Table name should match", "table", row[1]); } @@ -132,21 +132,21 @@ public void testListNamespace() { Assert.assertTrue("Namespace should exist", validationNamespaceCatalog.namespaceExists(NS)); - List namespaces = sql("SHOW NAMESPACES IN %s", catalogName); + List namespaces = sql("SHOW NAMESPACES IN %s", catalogName); if (isHadoopCatalog) { Assert.assertEquals("Should have 1 namespace", 1, namespaces.size()); - Set namespaceNames = namespaces.stream().map(arr -> arr[0]).collect(Collectors.toSet()); + Set namespaceNames = namespaces.stream().map(arr -> arr[0].toString()).collect(Collectors.toSet()); Assert.assertEquals("Should have only db namespace", ImmutableSet.of("db"), namespaceNames); } else { Assert.assertEquals("Should have 2 namespaces", 2, namespaces.size()); - Set namespaceNames = namespaces.stream().map(arr -> arr[0]).collect(Collectors.toSet()); + Set namespaceNames = namespaces.stream().map(arr -> arr[0].toString()).collect(Collectors.toSet()); Assert.assertEquals("Should have default and db namespaces", ImmutableSet.of("default", "db"), namespaceNames); } - List nestedNamespaces = sql("SHOW NAMESPACES IN %s", fullNamespace); + List nestedNamespaces = sql("SHOW NAMESPACES IN %s", fullNamespace); - Set nestedNames = nestedNamespaces.stream().map(arr -> arr[0]).collect(Collectors.toSet()); + Set nestedNames = nestedNamespaces.stream().map(arr -> arr[0].toString()).collect(Collectors.toSet()); Assert.assertEquals("Should not have nested namespaces", ImmutableSet.of(), nestedNames); } diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java new file mode 100644 index 000000000000..8dcc6a096f1f --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWrites.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.functions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +public class TestPartitionedWrites extends SparkCatalogTestBase { + public TestPartitionedWrites(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void createTables() { + sql("CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3))", tableName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testInsertAppend() { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName); + + Assert.assertEquals("Should have 5 rows after insert", 5L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(3L, "c"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Ignore // broken because of SPARK-32168, which should be fixed in 3.0.1 + public void testInsertOverwrite() { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + // 4 and 5 replace 3 in the partition (id - (id % 3)) = 3 + sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName); + + Assert.assertEquals("Should have 4 rows after overwrite", 4L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2Append() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).append(); + + Assert.assertEquals("Should have 5 rows after insert", 5L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(3L, "c"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).overwritePartitions(); + + Assert.assertEquals("Should have 4 rows after overwrite", 4L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2Overwrite() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).overwrite(functions.col("id").$less(3)); + + Assert.assertEquals("Should have 3 rows after overwrite", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(3L, "c"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java new file mode 100644 index 000000000000..51da0735f394 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +public class TestSelect extends SparkCatalogTestBase { + private int scanEventCount = 0; + private ScanEvent lastScanEvent = null; + + public TestSelect(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + + // register a scan event listener to validate pushdown + Listeners.register(event -> { + scanEventCount += 1; + lastScanEvent = event; + }, ScanEvent.class); + } + + @Before + public void createTables() { + sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); + + this.scanEventCount = 0; + this.lastScanEvent = null; + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testSelect() { + List expected = ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")); + + assertEquals("Should return all expected rows", expected, sql("SELECT * FROM %s", tableName)); + } + + @Test + public void testProjection() { + List expected = ImmutableList.of(row(1L), row(2L), row(3L)); + + assertEquals("Should return all expected rows", expected, sql("SELECT id FROM %s", tableName)); + + Assert.assertEquals("Should create only one scan", 1, scanEventCount); + Assert.assertEquals("Should not push down a filter", Expressions.alwaysTrue(), lastScanEvent.filter()); + Assert.assertEquals("Should project only the id column", + validationCatalog.loadTable(tableIdent).schema().select("id").asStruct(), + lastScanEvent.projection().asStruct()); + } + + @Test + public void testExpressionPushdown() { + List expected = ImmutableList.of(row("b")); + + assertEquals("Should return all expected rows", expected, sql("SELECT data FROM %s WHERE id = 2", tableName)); + + Assert.assertEquals("Should create only one scan", 1, scanEventCount); + Assert.assertEquals("Should not push down a filter", + "(id IS NOT NULL AND id = 2)", + Spark3Util.describe(lastScanEvent.filter())); + Assert.assertEquals("Should project only the id column", + validationCatalog.loadTable(tableIdent).schema().asStruct(), + lastScanEvent.projection().asStruct()); + } + + @Test + public void testMetadataTables() { + Assume.assumeFalse( + "Spark session catalog does not support metadata tables", + "spark_catalog".equals(catalogName)); + + assertEquals("Snapshot metadata table", + ImmutableList.of(row(ANY, ANY, null, "append", ANY, ANY)), + sql("SELECT * FROM %s.snapshots", tableName)); + } +} diff --git a/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java new file mode 100644 index 000000000000..988df10e0e18 --- /dev/null +++ b/spark3/src/test/java/org/apache/iceberg/spark/sql/TestUnpartitionedWrites.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.sql; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.functions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestUnpartitionedWrites extends SparkCatalogTestBase { + public TestUnpartitionedWrites(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void createTables() { + sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName); + } + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testInsertAppend() { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("INSERT INTO %s VALUES (4, 'd'), (5, 'e')", tableName); + + Assert.assertEquals("Should have 5 rows after insert", 5L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(3L, "c"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testInsertOverwrite() { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + sql("INSERT OVERWRITE %s VALUES (4, 'd'), (5, 'e')", tableName); + + Assert.assertEquals("Should have 2 rows after overwrite", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2Append() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).append(); + + Assert.assertEquals("Should have 5 rows after insert", 5L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(1L, "a"), + row(2L, "b"), + row(3L, "c"), + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2DynamicOverwrite() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).overwritePartitions(); + + Assert.assertEquals("Should have 2 rows after overwrite", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } + + @Test + public void testDataFrameV2Overwrite() throws NoSuchTableException { + Assert.assertEquals("Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List data = ImmutableList.of( + new SimpleRecord(4, "d"), + new SimpleRecord(5, "e") + ); + Dataset ds = spark.createDataFrame(data, SimpleRecord.class); + + ds.writeTo(tableName).overwrite(functions.col("id").$less$eq(3)); + + Assert.assertEquals("Should have 2 rows after overwrite", 2L, scalarSql("SELECT count(*) FROM %s", tableName)); + + List expected = ImmutableList.of( + row(4L, "d"), + row(5L, "e") + ); + + assertEquals("Row data should match expected", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + } +}