diff --git a/docker/demo/config/dfs-source.properties b/docker/demo/config/dfs-source.properties index ac7080e1412bc..a90629ef8e67e 100644 --- a/docker/demo/config/dfs-source.properties +++ b/docker/demo/config/dfs-source.properties @@ -19,6 +19,10 @@ include=base.properties # Key fields, for kafka example hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=date +# NOTE: We have to duplicate configuration since this is being used +# w/ both Spark and DeltaStreamer +hoodie.table.recordkey.fields=key +hoodie.table.partition.fields=date # Schema provider props (change to absolute path based on your installation) hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index c7cc50967a485..962875fb924fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1006,21 +1006,21 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst // finish off any pending compactions if any from previous attempt. writeClient.runAnyPendingCompactions(); - String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() + String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() .get().getTimestamp(); List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList()); if (!pendingInstants.isEmpty()) { LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", - pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray()))); + pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray()))); return; } // Trigger compaction with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - final String compactionInstantTime = latestDeltacommitTime + "001"; + final String compactionInstantTime = latestDeltaCommitTime + "001"; if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { writeClient.compact(compactionInstantTime); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index d7561cc126592..ad71b17ce70ff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER); protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER); - protected transient volatile SparkRowConverter rowConverter; protected transient volatile SparkRowAccessor rowAccessor; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index dcffdf3cdb836..8c43e19baaf55 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -46,6 +47,12 @@ public SimpleKeyGenerator(TypedProperties props) { SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { super(props); + // Make sure key-generator is configured properly + ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(), + "Record key field has to be non-empty!"); + ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(), + "Partition path field has to be non-empty!"); + this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField); this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField); this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala index 626b3c6ef0d4e..2279e5a13f6f8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType trait SparkParsePartitionUtil extends Serializable { - def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow + def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone, + validatePartitionValues: Boolean = false): InternalRow } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index ab7d0164ea743..eaad4d471e0d3 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable { /** * Create the SparkParsePartitionUtil. */ - def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil + def getSparkParsePartitionUtil: SparkParsePartitionUtil /** * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java index e0bc22f70d231..05617301936eb 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java @@ -97,7 +97,7 @@ private void setUp(boolean populateMetaFields, boolean partitioned) throws Excep initTestDataGenerator(new String[] {""}); } initFileSystem(); - Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + Properties props = getPropertiesForKeyGen(populateMetaFields); props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props); config = getConfigBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 0ce6ca0ee923b..0b80d20b39c36 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.Transformations; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -61,7 +62,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.storage.StorageLevel; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness private HoodieTableMetaClient metaClient; private HoodieTestDataGenerator dataGen; - @BeforeEach - void setUp() throws IOException { - Properties properties = new Properties(); + void setUp(Properties props) throws IOException { + Properties properties = CollectionUtils.copy(props); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); dataGen = new HoodieTestDataGenerator(); @@ -99,6 +98,9 @@ void setUp() throws IOException { @Test public void testMetadataAggregateFromWriteStatus() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { String newCommitTime = "001"; @@ -125,6 +127,9 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -213,6 +218,8 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); + setUp(config.getProps()); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -302,6 +309,8 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY) .withAutoCommit(false).build(); + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); @@ -381,6 +390,9 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep @Test public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { Map fileIdToInsertsMap = new HashMap<>(); Map fileIdToUpsertsMap = new HashMap<>(); @@ -497,6 +509,9 @@ public void testRollingStatsWithSmallFileHandling() throws Exception { @Test public void testHandleUpdateWithMultiplePartitions() throws Exception { HoodieWriteConfig cfg = getConfig(true); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -578,6 +593,9 @@ public void testReleaseResource() throws Exception { HoodieWriteConfig.Builder builder = getConfigBuilder(true); builder.withReleaseResourceEnabled(true); builder.withAutoCommit(false); + + setUp(builder.build().getProps()); + /** * Write 1 (test when RELEASE_RESOURCE_ENABLE is true) */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index 3b30c5b767367..f959a8f0d9526 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -54,6 +54,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,8 +99,13 @@ public void testWriteDuringCompaction() throws IOException { .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build(); - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + + Properties props = getPropertiesForKeyGen(true); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); client = getHoodieWriteClient(config); // write data and commit @@ -138,8 +144,13 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build(); - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + + Properties props = getPropertiesForKeyGen(true); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); client = getHoodieWriteClient(config); final List records = dataGen.generateInserts("001", 100); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index 5df7b4daecc72..275fd32ca7d8b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -82,7 +82,7 @@ void setUp() { public void testIncrementalReadsWithCompaction() throws Exception { final String partitionPath = "2020/02/20"; // use only one partition for this test final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath }); - Properties props = new Properties(); + Properties props = getPropertiesForKeyGen(true); props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); HoodieWriteConfig cfg = getConfigBuilder(true).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 2f0e585ec90a0..0a11425ec5b89 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCleanConfig; @@ -155,7 +156,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = new Properties(); + Properties properties = CollectionUtils.copy(cfg.getProps()); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); @@ -327,7 +328,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + Properties properties = getPropertiesForKeyGen(populateMetaFields); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); @@ -606,8 +607,10 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { .withMarkersType(MarkerType.DIRECT.name()); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = new Properties(); + Properties properties = getPropertiesForKeyGen(true); + properties.putAll(cfg.getProps()); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 80d185f62bc9c..468be5fb26407 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -341,8 +341,12 @@ protected int incrementTimelineServicePortToUse() { } protected Properties getPropertiesForKeyGen() { + return getPropertiesForKeyGen(false); + } + + protected Properties getPropertiesForKeyGen(boolean populateMetaFields) { Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)); properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index c58dd178dca3a..ba1afbebb2529 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -160,7 +160,7 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin } public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException { - return getHoodieMetaClient(hadoopConf, basePath, new Properties()); + return getHoodieMetaClient(hadoopConf, basePath, getPropertiesForKeyGen(true)); } @Override @@ -310,8 +310,12 @@ protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOExcept } protected Properties getPropertiesForKeyGen() { + return getPropertiesForKeyGen(false); + } + + protected Properties getPropertiesForKeyGen(boolean populateMetaFields) { Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)); properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); @@ -321,9 +325,9 @@ protected Properties getPropertiesForKeyGen() { } protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + configBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields)); if (!populateMetaFields) { - configBuilder.withProperties(getPropertiesForKeyGen()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); + configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 0b2c34618ed6c..5910de5f1d39e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -18,6 +18,8 @@ package org.apache.hudi; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -34,14 +36,14 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -53,6 +55,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe; + /** * Common (engine-agnostic) File Index implementation enabling individual query engines to * list Hudi Table contents based on the @@ -63,13 +67,12 @@ *
  • Query instant/range
  • * */ -public abstract class BaseHoodieTableFileIndex { +public abstract class BaseHoodieTableFileIndex implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class); private final String[] partitionColumns; - private final FileSystemViewStorageConfig fileSystemStorageConfig; protected final HoodieMetadataConfig metadataConfig; private final HoodieTableQueryType queryType; @@ -80,7 +83,7 @@ public abstract class BaseHoodieTableFileIndex { private final boolean shouldValidateInstant; private final HoodieTableType tableType; - protected final String basePath; + protected final Path basePath; private final HoodieTableMetaClient metaClient; private final HoodieEngineContext engineContext; @@ -94,6 +97,8 @@ public abstract class BaseHoodieTableFileIndex { private transient volatile HoodieTableFileSystemView fileSystemView = null; + private transient HoodieTableMetadata tableMetadata = null; + /** * @param engineContext Hudi engine-specific context * @param metaClient Hudi table's meta-client @@ -117,9 +122,6 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, this.partitionColumns = metaClient.getTableConfig().getPartitionFields() .orElse(new String[0]); - this.fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() - .fromProperties(configProperties) - .build(); this.metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(configProperties) .build(); @@ -131,7 +133,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, this.shouldValidateInstant = shouldValidateInstant; this.tableType = metaClient.getTableType(); - this.basePath = metaClient.getBasePath(); + this.basePath = metaClient.getBasePathV2(); this.metaClient = metaClient; this.engineContext = engineContext; @@ -153,7 +155,7 @@ public Option getLatestCompletedInstant() { * Returns table's base-path */ public String getBasePath() { - return metaClient.getBasePath(); + return basePath.toString(); } /** @@ -172,14 +174,19 @@ public int getFileSlicesCount() { .mapToInt(List::size).sum(); } + @Override + public void close() throws Exception { + resetTableMetadata(null); + } + protected List getAllQueryPartitionPaths() { List queryRelativePartitionPaths = queryPaths.stream() - .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path)) + .map(path -> FSUtils.getRelativePartitionPath(basePath, path)) .collect(Collectors.toList()); // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryRelativePartitionPaths directly. - List matchedPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath) + List matchedPartitionPaths = getAllPartitionPathsUnchecked() .stream() .filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith)) .collect(Collectors.toList()); @@ -244,12 +251,7 @@ private Map loadPartitionPathFiles() { ); fetchedPartitionToFiles = - FSUtils.getFilesInPartitions( - engineContext, - metadataConfig, - basePath, - fullPartitionPathsMapToFetch.keySet().toArray(new String[0]), - fileSystemStorageConfig.getSpillableDir()) + getAllFilesInPartitionsUnchecked(fullPartitionPathsMapToFetch.keySet()) .entrySet() .stream() .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue())); @@ -267,6 +269,11 @@ private Map loadPartitionPathFiles() { private void doRefresh() { long startTime = System.currentTimeMillis(); + HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + + resetTableMetadata(newTableMetadata); + Map partitionFiles = loadPartitionPathFiles(); FileStatus[] allFiles = partitionFiles.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); @@ -278,7 +285,7 @@ private void doRefresh() { // TODO we can optimize the flow by: // - First fetch list of files from instants of interest // - Load FileStatus's - fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles); + this.fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles); Option queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp)); @@ -324,6 +331,22 @@ private void doRefresh() { LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration)); } + private Map getAllFilesInPartitionsUnchecked(Collection fullPartitionPathsMapToFetch) { + try { + return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch)); + } catch (IOException e) { + throw new HoodieIOException("Failed to list partition paths for a table", e); + } + } + + private List getAllPartitionPathsUnchecked() { + try { + return isPartitionedTable() ? tableMetadata.getAllPartitionPaths() : Collections.singletonList(""); + } catch (IOException e) { + throw new HoodieIOException("Failed to fetch partition paths for a table", e); + } + } + private void validate(HoodieTimeline activeTimeline, Option queryInstant) { if (shouldValidateInstant) { if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) { @@ -340,7 +363,23 @@ private static long fileSliceSize(FileSlice fileSlice) { return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize; } + private void resetTableMetadata(HoodieTableMetadata newTableMetadata) { + if (tableMetadata != null) { + try { + tableMetadata.close(); + } catch (Exception e) { + throw new HoodieException("Failed to close HoodieTableMetadata instance", e); + } + } + tableMetadata = newTableMetadata; + } + + private boolean isPartitionedTable() { + return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString()); + } + public static final class PartitionPath { + final String path; final Object[] values; @@ -353,12 +392,14 @@ public String getPath() { return path; } - Path fullPartitionPath(String basePath) { + Path fullPartitionPath(Path basePath) { if (!path.isEmpty()) { - return new CachingPath(basePath, path); + // NOTE: Since we now that the path is a proper relative path that doesn't require + // normalization we create Hadoop's Path using more performant unsafe variant + return new CachingPath(basePath, createPathUnsafe(path)); } - return new CachingPath(basePath); + return basePath; } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d940f3bb4577a..fe697197f20e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.InvalidHoodiePathException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hadoop.conf.Configuration; @@ -68,6 +69,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority; + /** * Utility functions related to accessing the file storage. */ @@ -216,8 +219,8 @@ public static List getAllPartitionFoldersThreeLevelsDown(FileSystem fs, * Given a base partition and a partition path, return relative path of partition path to the base path. */ public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) { - basePath = Path.getPathWithoutSchemeAndAuthority(basePath); - fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath); + basePath = getPathWithoutSchemeAndAuthority(basePath); + fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath); String fullPartitionPathStr = fullPartitionPath.toString(); @@ -607,12 +610,12 @@ public static Path getPartitionPath(String basePath, String partitionPath) { String properPartitionPath = partitionPath.startsWith("/") ? partitionPath.substring(1) : partitionPath; - return getPartitionPath(new Path(basePath), properPartitionPath); + return getPartitionPath(new CachingPath(basePath), properPartitionPath); } public static Path getPartitionPath(Path basePath, String partitionPath) { - // FOr non-partitioned table, return only base-path - return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath); + // For non-partitioned table, return only base-path + return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new CachingPath(basePath, partitionPath); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index cd35861b7499e..fe9837e6c693b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; import java.io.Serializable; import java.util.Objects; @@ -66,6 +67,14 @@ public String getPath() { return fullPath; } + public Path getHadoopPath() { + if (fileStatus != null) { + return fileStatus.getPath(); + } + + return new CachingPath(fullPath); + } + public String getFileName() { return fileName; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 529b0e8c99edb..5478c105a4a54 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -398,7 +398,7 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) { public void validateTableProperties(Properties properties) { // Once meta fields are disabled, it cant be re-enabled for a given table. if (!getTableConfig().populateMetaFields() - && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) { + && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()))) { throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 72cb3a0ef3b47..d923c592708c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -18,19 +18,6 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.inline.InLineFSUtils; -import org.apache.hudi.common.fs.inline.InLineFileSystem; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieHBaseKVComparator; -import org.apache.hudi.io.storage.HoodieHFileReader; - import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; @@ -43,6 +30,16 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; +import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -167,9 +164,8 @@ protected ClosableIterator deserializeRecords(byte[] content) thr // Get schema from the header Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration()); // Read the content - HoodieHFileReader reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema)); + HoodieHFileReader reader = new HoodieHFileReader<>(null, pathForReader, content, Option.of(writerSchema)); Iterator recordIterator = reader.getRecordIterator(readerSchema); return new ClosableIterator() { @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index dc6fc47b58085..5818636caef2c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -95,8 +95,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV private BootstrapIndex bootstrapIndex; - private String getPartitionPathFromFilePath(String fullPath) { - return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent()); + private String getPartitionPathFor(HoodieBaseFile baseFile) { + return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), baseFile.getHadoopPath().getParent()); } /** @@ -166,8 +166,8 @@ protected List buildFileGroups(FileStatus[] statuses, HoodieTim protected List buildFileGroups(Stream baseFileStream, Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { Map, List> baseFiles = - baseFileStream.collect(Collectors.groupingBy((baseFile) -> { - String partitionPathStr = getPartitionPathFromFilePath(baseFile.getPath()); + baseFileStream.collect(Collectors.groupingBy(baseFile -> { + String partitionPathStr = getPartitionPathFor(baseFile); return Pair.of(partitionPathStr, baseFile.getFileId()); })); @@ -183,7 +183,8 @@ protected List buildFileGroups(Stream baseFileS List fileGroups = new ArrayList<>(); fileIdSet.forEach(pair -> { String fileId = pair.getValue(); - HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline); + String partitionPath = pair.getKey(); + HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline); if (baseFiles.containsKey(pair)) { baseFiles.get(pair).forEach(group::addBaseFile); } @@ -373,7 +374,7 @@ private Stream convertFileStatusesToLogFiles(FileStatus[] statuse * @param baseFile base File */ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) { - final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath()); + final String partitionPath = getPartitionPathFor(baseFile); Option> compactionWithInstantTime = getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 8036995fab567..194d67cd0e3ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -54,6 +54,15 @@ public static boolean nonEmpty(Collection c) { return !isNullOrEmpty(c); } + /** + * Makes a copy of provided {@link Properties} object + */ + public static Properties copy(Properties props) { + Properties copy = new Properties(); + copy.putAll(props); + return copy; + } + /** * Returns last element of the array of {@code T} */ diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 01b3eb9d409bb..d6e35dbbdc5a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -19,10 +19,11 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; +import org.apache.hudi.exception.HoodieException; import javax.annotation.concurrent.ThreadSafe; -import java.io.Serializable; import java.net.URI; +import java.net.URISyntaxException; /** * This is an extension of the {@code Path} class allowing to avoid repetitive @@ -32,11 +33,12 @@ * NOTE: This class is thread-safe */ @ThreadSafe -public class CachingPath extends Path implements Serializable { +public class CachingPath extends Path { // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 + private volatile Path parent; private volatile String fileName; private volatile String fullPathStr; @@ -74,6 +76,17 @@ public String getName() { return fileName; } + @Override + public Path getParent() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (parent == null) { + parent = super.getParent(); + } + + return parent; + } + @Override public String toString() { // This value could be overwritten concurrently and that's okay, since @@ -83,4 +96,46 @@ public String toString() { } return fullPathStr; } + + public CachingPath subPath(String relativePath) { + return new CachingPath(this, createPathUnsafe(relativePath)); + } + + public static CachingPath wrap(Path path) { + if (path instanceof CachingPath) { + return (CachingPath) path; + } + + return new CachingPath(path.toUri()); + } + + /** + * Creates path based on the provided *relative* path + * + * NOTE: This is an unsafe version that is relying on the fact that the caller is aware + * what they are doing this is not going to work with paths having scheme (which require + * parsing) and is only meant to work w/ relative paths in a few specific cases. + */ + public static CachingPath createPathUnsafe(String relativePath) { + try { + // NOTE: {@code normalize} is going to be invoked by {@code Path} ctor, so there's no + // point in invoking it here + URI uri = new URI(null, null, relativePath, null, null); + return new CachingPath(uri); + } catch (URISyntaxException e) { + throw new HoodieException("Failed to instantiate relative path", e); + } + } + + /** + * This is {@link Path#getPathWithoutSchemeAndAuthority(Path)} counterpart, instantiating + * {@link CachingPath} + */ + public static Path getPathWithoutSchemeAndAuthority(Path path) { + // This code depends on Path.toString() to remove the leading slash before + // the drive specification on Windows. + return path.isUriPathAbsolute() + ? createPathUnsafe(path.toUri().getPath()) + : path; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java index 5ad2307ef804a..796600a7e838e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -24,6 +24,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.net.URI; import java.util.Objects; /** @@ -42,12 +43,12 @@ public Path get() { } private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(path.toString()); + out.writeObject(path.toUri()); } - private void readObject(ObjectInputStream in) throws IOException { - String pathStr = in.readUTF(); - path = new CachingPath(pathStr); + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + URI uri = (URI) in.readObject(); + path = new CachingPath(uri); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java index 3767ea1832579..878a3c563b6f5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java @@ -67,7 +67,9 @@ public static HFile.Reader createHFileReader( */ public static HFile.Reader createHFileReader( FileSystem fs, Path dummyPath, byte[] content) throws IOException { - Configuration conf = new Configuration(); + // Avoid loading default configs, from the FS, since this configuration is mostly + // used as a stub to initialize HFile reader + Configuration conf = new Configuration(false); HoodieHFileReader.SeekableByteArrayInputStream bis = new HoodieHFileReader.SeekableByteArrayInputStream(content); FSDataInputStream fsdis = new FSDataInputStream(bis); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fsdis); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 2036500ac6567..37a209b0a8719 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.common.bloom.BloomFilter; @@ -39,10 +40,13 @@ import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -55,8 +59,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.function.Function; import java.util.stream.Collectors; public abstract class BaseTableMetadata implements HoodieTableMetadata { @@ -68,7 +74,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final transient HoodieEngineContext engineContext; protected final SerializableConfiguration hadoopConf; - protected final String dataBasePath; + protected final SerializablePath dataBasePath; protected final HoodieTableMetaClient dataMetaClient; protected final Option metrics; protected final HoodieMetadataConfig metadataConfig; @@ -83,7 +89,7 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon String dataBasePath, String spillableMapDirectory) { this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); - this.dataBasePath = dataBasePath; + this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath)); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build(); this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; @@ -113,7 +119,7 @@ public List getAllPartitionPaths() throws IOException { throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e); } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths(); } @@ -138,13 +144,17 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning()) + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) .getAllFilesInPartition(partitionPath); } @Override public Map getAllFilesInPartitions(List partitions) throws IOException { + if (partitions.isEmpty()) { + return Collections.emptyMap(); + } + if (isMetadataTableEnabled) { try { List partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList()); @@ -154,7 +164,7 @@ public Map getAllFilesInPartitions(List partitions } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning()) + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) .getAllFilesInPartitions(partitions); } @@ -278,20 +288,23 @@ public Map, HoodieMetadataColumnStats> getColumnStats(final */ protected List fetchAllPartitionPaths() { HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, + Option> recordOpt = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); - List partitions = Collections.emptyList(); - if (hoodieRecord.isPresent()) { - handleSpuriousDeletes(hoodieRecord, "\"all partitions\""); - partitions = hoodieRecord.get().getData().getFilenames(); - // Partition-less tables have a single empty partition - if (partitions.contains(NON_PARTITIONED_NAME)) { - partitions.remove(NON_PARTITIONED_NAME); - partitions.add(""); + List partitions = recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, "\"all partitions\""); + + List relativePaths = metadataPayload.getFilenames(); + // Non-partitioned tables have a single empty partition + if (relativePaths.size() == 1 && relativePaths.get(0).equals(NON_PARTITIONED_NAME)) { + return Collections.singletonList(""); + } else { + return relativePaths; } - } + }) + .orElse(Collections.emptyList()); LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); return partitions; @@ -303,75 +316,81 @@ protected List fetchAllPartitionPaths() { * @param partitionPath The absolute path of the partition */ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath); - if (partitionName.isEmpty()) { - partitionName = NON_PARTITIONED_NAME; - } + String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath); + String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME : relativePartitionPath; HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKey(partitionName, + Option> recordOpt = getRecordByKey(recordKey, MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - FileStatus[] statuses = {}; - if (hoodieRecord.isPresent()) { - handleSpuriousDeletes(hoodieRecord, partitionName); - statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath); - } + FileStatus[] statuses = recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, recordKey); + try { + return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to extract file-statuses from the payload", e); + } + }) + .orElse(new FileStatus[0]); - LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length); + LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length); return statuses; } Map fetchAllFilesInPartitionPaths(List partitionPaths) throws IOException { - Map partitionInfo = new HashMap<>(); - boolean foundNonPartitionedPath = false; - for (Path partitionPath: partitionPaths) { - String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath); - if (partitionName.isEmpty()) { - if (partitionInfo.size() > 1) { - throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); - } - partitionInfo.put(NON_PARTITIONED_NAME, partitionPath); - foundNonPartitionedPath = true; - } else { - if (foundNonPartitionedPath) { - throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); - } - partitionInfo.put(partitionName, partitionPath); - } - } + Map partitionIdToPathMap = + partitionPaths.parallelStream() + .collect( + Collectors.toMap(partitionPath -> { + String partitionId = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath); + return partitionId.isEmpty() ? NON_PARTITIONED_NAME : partitionId; + }, Function.identity()) + ); HoodieTimer timer = new HoodieTimer().startTimer(); - List>>> partitionsFileStatus = - getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.getPartitionPath()); + List>>> partitionIdRecordPairs = + getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - Map result = new HashMap<>(); - for (Pair>> entry: partitionsFileStatus) { - if (entry.getValue().isPresent()) { - handleSpuriousDeletes(entry.getValue(), entry.getKey()); - result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey()))); - } - } + FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get()); + + Map partitionPathToFilesMap = partitionIdRecordPairs.parallelStream() + .map(pair -> { + String partitionId = pair.getKey(); + Option> recordOpt = pair.getValue(); + + Path partitionPath = partitionIdToPathMap.get(partitionId); + + return recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, partitionId); + + FileStatus[] files = metadataPayload.getFileStatuses(fs, partitionPath); + return Pair.of(partitionPath.toString(), files); + }) + .orElse(null); + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray())); - return result; + + return partitionPathToFilesMap; } /** * Handle spurious deletes. Depending on config, throw an exception or log a warn msg. - * @param hoodieRecord instance of {@link HoodieRecord} of interest. - * @param partitionName partition name of interest. */ - private void handleSpuriousDeletes(Option> hoodieRecord, String partitionName) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { + private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, String partitionName) { + if (!metadataPayload.getDeletions().isEmpty()) { if (metadataConfig.ignoreSpuriousDeletes()) { LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. " + "Ignoring the spurious deletes as the `" + HoodieMetadataConfig.IGNORE_SPURIOUS_DELETES.key() + "` config is set to true"); } else { throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); + + metadataPayload); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f8a0389da3d4c..d4865875b166f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -111,7 +111,7 @@ public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetada } private void initIfNeeded() { - this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath); + this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString()); if (!isMetadataTableEnabled) { if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) { LOG.info("Metadata table is disabled."); @@ -303,8 +303,6 @@ private List>>> readFrom } } - List>>> result = new ArrayList<>(); - HoodieTimer readTimer = new HoodieTimer(); readTimer.startTimer(); @@ -413,7 +411,7 @@ private Pair openReaders( // Open the log record scanner using the log files from the latest file slice List logFiles = slice.getLogFiles().collect(Collectors.toList()); Pair logRecordScannerOpenTimePair = - getLogRecordScanner(logFiles, partitionName); + getLogRecordScanner(logFiles, partitionName, Option.empty()); HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey(); final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -465,11 +463,6 @@ private Set getValidInstantTimestamps() { return validInstantTimestamps; } - public Pair getLogRecordScanner(List logFiles, - String partitionName) { - return getLogRecordScanner(logFiles, partitionName, Option.empty()); - } - public Pair getLogRecordScanner(List logFiles, String partitionName, Option allowFullScanOverride) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index df138cd124971..0575177800078 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -53,6 +53,7 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.util.Lazy; @@ -80,6 +81,7 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe; import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal; @@ -473,10 +475,22 @@ public Option getColumnStatMetadata() { */ public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath) throws IOException { FileSystem fs = partitionPath.getFileSystem(hadoopConf); + return getFileStatuses(fs, partitionPath); + } + + /** + * Returns the files added as part of this record. + */ + public FileStatus[] getFileStatuses(FileSystem fs, Path partitionPath) { long blockSize = fs.getDefaultBlockSize(partitionPath); return filterFileInfoEntries(false) - .map(e -> new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0, - null, null, null, new Path(partitionPath, e.getKey()))) + .map(e -> { + // NOTE: Since we know that the Metadata Table's Payload is simply a file-name we're + // creating Hadoop's Path using more performant unsafe variant + CachingPath filePath = new CachingPath(partitionPath, createPathUnsafe(e.getKey())); + return new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0, + null, null, null, filePath); + }) .toArray(FileStatus[]::new); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index ae871e3be0c03..349c0efb482a5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; - import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieMetadataException; @@ -107,13 +106,27 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapPath, boolean reuse) { if (metadataConfig.enabled()) { - return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); + return createHoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); } else { - return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), - datasetBasePath, metadataConfig.shouldAssumeDatePartitioning()); + return createFSBackedTableMetadata(engineContext, metadataConfig, datasetBasePath); } } + static FileSystemBackedTableMetadata createFSBackedTableMetadata(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String datasetBasePath) { + return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), + datasetBasePath, metadataConfig.shouldAssumeDatePartitioning()); + } + + static HoodieBackedTableMetadata createHoodieBackedTableMetadata(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String datasetBasePath, + String spillableMapPath, + boolean reuse) { + return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); + } + /** * Fetch all the files at the given partition path, per the latest snapshot of the metadata. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 6a2bffd34d6e6..d3c1de56773b3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -40,6 +40,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -124,28 +125,28 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - Properties properties) - throws IOException { - properties = HoodieTableMetaClient.withPropertyBuilder() - .setTableName(RAW_TRIPS_TEST_NAME) - .setTableType(tableType) - .setPayloadClass(HoodieAvroPayload.class) - .fromProperties(properties) - .build(); - return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); + Properties properties) throws IOException { + return init(hadoopConf, basePath, tableType, properties, null); } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, Properties properties, String databaseName) throws IOException { - properties = HoodieTableMetaClient.withPropertyBuilder() - .setDatabaseName(databaseName) - .setTableName(RAW_TRIPS_TEST_NAME) - .setTableType(tableType) - .setPayloadClass(HoodieAvroPayload.class) - .fromProperties(properties) - .build(); - return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); + HoodieTableMetaClient.PropertyBuilder builder = + HoodieTableMetaClient.withPropertyBuilder() + .setDatabaseName(databaseName) + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class); + + String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class"); + if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + builder.setPartitionFields("some_nonexistent_field"); + } + + Properties processedProperties = builder.fromProperties(properties).build(); + + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, processedProperties); } public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index db8002cd2d462..a4471845c3e42 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -86,7 +86,10 @@ public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFil baseFileFormat); } - java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + java.nio.file.Path partitionPath = useNonPartitionedKeyGen + ? basePath + : basePath.resolve(Paths.get("2016", "05", "01")); + setupPartition(basePath, partitionPath); if (injectData) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 4e70ebad75ee4..a9a38f5f82bdd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -20,12 +20,15 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption} +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, shouldValidatePartitionColumns} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.hadoop.CachingPath +import org.apache.hudi.hadoop.CachingPath.createPathUnsafe import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging @@ -39,6 +42,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ +import scala.language.implicitConversions /** * Implementation of the [[BaseHoodieTableFileIndex]] for Spark @@ -79,7 +83,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) }) - private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf) + private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil /** * Get the partition schema from the hoodie.properties. @@ -254,7 +258,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, } }.mkString("/") - val pathWithPartitionName = new Path(basePath, partitionWithName) + val pathWithPartitionName = new CachingPath(basePath, createPathUnsafe(partitionWithName)) val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) partitionValues.map(_.asInstanceOf[Object]).toArray @@ -269,22 +273,17 @@ class SparkHoodieTableFileIndex(spark: SparkSession, sparkParsePartitionUtil.parsePartition( partitionPath, typeInference = false, - Set(new Path(basePath)), + Set(basePath), partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId) + DateTimeUtils.getTimeZone(timeZoneId), + validatePartitionValues = shouldValidatePartitionColumns(spark) ) .toSeq(partitionSchema) } } -object SparkHoodieTableFileIndex { - implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = - if (opt.isDefined) { - org.apache.hudi.common.util.Option.of(opt.get) - } else { - org.apache.hudi.common.util.Option.empty() - } +object SparkHoodieTableFileIndex { /** * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding @@ -341,4 +340,9 @@ object SparkHoodieTableFileIndex { override def invalidate(): Unit = cache.invalidateAll() } } + + private def shouldValidatePartitionColumns(spark: SparkSession): Boolean = { + // NOTE: We can't use helper, method nor the config-entry to stay compatible w/ Spark 2.4 + spark.sessionState.conf.getConfString("spark.sql.sources.validatePartitionColumns", "true").toBoolean + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 890f8a9019a30..5a71f0e371360 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -237,7 +237,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df1.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Overwrite) .save(basePath) @@ -252,7 +252,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df2.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline() @@ -263,7 +263,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df3.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline() diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 7cd7271c6b501..91ab71ef1c3af 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -67,7 +67,7 @@ class Spark2Adapter extends SparkAdapter { ) } - override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil + override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark2ParsePartitionUtil override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala index c3cbcc407587c..fe0bf50e6974f 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala @@ -24,14 +24,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.InternalRow -class Spark2ParsePartitionUtil extends SparkParsePartitionUtil { +object Spark2ParsePartitionUtil extends SparkParsePartitionUtil { - override def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow = { + override def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone, + validatePartitionValues: Boolean = false): InternalRow = { val (partitionValues, _) = PartitioningUtils.parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java index 1157a68254a88..d7a9a1f12241d 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java @@ -53,12 +53,12 @@ public static DateFormatter getDateFormatter(ZoneId zoneId) { try { ClassLoader loader = Thread.currentThread().getContextClassLoader(); if (HoodieSparkUtils.gteqSpark3_2()) { - Class clazz = loader.loadClass(DateFormatter.class.getName()); + Class clazz = loader.loadClass(DateFormatter.class.getName()); Method applyMethod = clazz.getDeclaredMethod("apply"); applyMethod.setAccessible(true); return (DateFormatter)applyMethod.invoke(null); } else { - Class clazz = loader.loadClass(DateFormatter.class.getName()); + Class clazz = loader.loadClass(DateFormatter.class.getName()); Method applyMethod = clazz.getDeclaredMethod("apply", ZoneId.class); applyMethod.setAccessible(true); return (DateFormatter)applyMethod.invoke(null, zoneId); diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 5ba976d362076..77df665b98def 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -50,9 +50,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters - override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { - new Spark3ParsePartitionUtil(conf) - } + override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark3ParsePartitionUtil override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { parser.parseMultipartIdentifier(sqlText) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index f0cbe0530f3e2..ebe92a5a32a91 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -17,57 +17,63 @@ package org.apache.spark.sql.execution.datasources -import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong} -import java.math.{BigDecimal => JBigDecimal} -import java.time.ZoneId -import java.util.{Locale, TimeZone} - import org.apache.hadoop.fs.Path - import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH import org.apache.hudi.spark3.internal.ReflectUtil - +import org.apache.hudi.util.JFunction import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong} +import java.math.{BigDecimal => JBigDecimal} +import java.time.ZoneId +import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.{Locale, TimeZone} +import scala.collection.convert.Wrappers.JConcurrentMapWrapper import scala.collection.mutable.ArrayBuffer import scala.util.Try import scala.util.control.NonFatal -class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { +object Spark3ParsePartitionUtil extends SparkParsePartitionUtil { + + private val cache = JConcurrentMapWrapper( + new ConcurrentHashMap[ZoneId, (DateFormatter, TimestampFormatter)](1)) /** * The definition of PartitionValues has been changed by SPARK-34314 in Spark3.2. * To solve the compatibility between 3.1 and 3.2, copy some codes from PartitioningUtils in Spark3.2 here. * And this method will generate and return `InternalRow` directly instead of `PartitionValues`. */ - override def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow = { - val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId) - val timestampFormatter = TimestampFormatter(timestampPartitionPattern, - timeZone.toZoneId, isParsing = true) + override def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + tz: TimeZone, + validatePartitionValues: Boolean = false): InternalRow = { + val (dateFormatter, timestampFormatter) = cache.getOrElseUpdate(tz.toZoneId, { + val dateFormatter = ReflectUtil.getDateFormatter(tz.toZoneId) + val timestampFormatter = TimestampFormatter(timestampPartitionPattern, tz.toZoneId, isParsing = true) + + (dateFormatter, timestampFormatter) + }) val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter) + validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter) partitionValues.map { case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) => val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) => try { - castPartValueToDesiredType(typedValue.dataType, typedValue.value, timeZone.toZoneId) + castPartValueToDesiredType(typedValue.dataType, typedValue.value, tz.toZoneId) } catch { case NonFatal(_) => - if (conf.validatePartitionColumns) { + if (validatePartitionValues) { throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " + s"`${typedValue.dataType}` for partition column `$columnName`") } else null diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 69d6dd7d3b298..86bf170fe84fd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1646,7 +1646,7 @@ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetVal props.setProperty("include", "base.properties"); props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", ""); + props.setProperty("hoodie.datasource.write.partitionpath.field", "driver"); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); @@ -1670,7 +1670,7 @@ private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoRes prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, ""); + PARQUET_SOURCE_ROOT, false, "driver"); // delta streamer w/ parquet source String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( @@ -1797,8 +1797,8 @@ public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Except private void prepareCsvDFSSource( boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException { String sourceRoot = dfsBasePath + "/csvFiles"; - String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0"; - String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : ""; + String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c1"; + String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "_c2"; // Properties used for testing delta-streamer with CSV source TypedProperties csvProps = new TypedProperties(); @@ -2070,7 +2070,7 @@ public void testInsertOverwriteTable() throws Exception { @Test public void testDeletePartitions() throws Exception { prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", - PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, ""); + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path"); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 57270bdf812d3..0d090e1c45817 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -114,9 +114,9 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe if (expectedCount == 0) { assertFalse(batchCheckPoint.getKey().isPresent()); } else { - assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + assertEquals(expectedCount, batchCheckPoint.getKey().get().count()); } - Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); + Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } private Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException {