From 5d1b90a6e91fbfe1229556377831d0c52d9c7613 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 8 May 2023 16:39:13 -0400 Subject: [PATCH 01/10] COW working but found new MOR issue --- .../table/action/commit/BaseMergeHelper.java | 3 +- .../action/commit/HoodieMergeHelper.java | 18 +- .../table/HoodieFlinkCopyOnWriteTable.java | 2 +- .../commit/BaseFlinkCommitActionExecutor.java | 2 +- .../table/HoodieJavaCopyOnWriteTable.java | 2 +- .../commit/BaseJavaCommitActionExecutor.java | 2 +- .../hudi/client/bootstrap/PartitionUtils.java | 36 +++ .../table/HoodieSparkCopyOnWriteTable.java | 11 +- .../commit/BaseSparkCommitActionExecutor.java | 20 +- .../io/storage/HoodieBootstrapFileReader.java | 36 +-- .../HoodieBootstrapRecordIterator.java | 80 ++++++ .../hudi/functional/TestBootstrapRead.java | 239 ++++++++++++++++++ 12 files changed, 410 insertions(+), 41 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java create mode 100644 hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 17b8620da63f..f3deed073b6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; @@ -36,7 +37,7 @@ public abstract class BaseMergeHelper { * @param upsertHandle Merge Handle * @throws IOException in case of error */ - public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle) throws IOException; + public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle, Option partitionFields, Object[] partitionValues) throws IOException; /** * Consumer that dequeues records from queue and sends to Merge Handle. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index fa5711616aa1..2df98f6dadc6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -76,7 +76,7 @@ public static HoodieMergeHelper newInstance() { @Override public void runMerge(HoodieTable table, - HoodieMergeHandle mergeHandle) throws IOException { + HoodieMergeHandle mergeHandle, Option partitionFields, Object[] partitionValues) throws IOException { HoodieWriteConfig writeConfig = table.getConfig(); HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); @@ -119,14 +119,14 @@ public void runMerge(HoodieTable table, if (baseFile.getBootstrapBaseFile().isPresent()) { Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - bootstrapFileReader = - HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath); - - recordIterator = new MergingIterator<>( - baseFileRecordIterator, - bootstrapFileReader.getRecordIterator(), - (left, right) -> - left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields())); + bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( + baseFileReader, + HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), + partitionFields, + partitionValues); + + + recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields()); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); } else { recordIterator = baseFileRecordIterator; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index cb046e2e91fe..64aa391a381e 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -371,7 +371,7 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle statuses = upsertHandle.writeStatuses(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java new file mode 100644 index 000000000000..c54819551cb9 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java @@ -0,0 +1,36 @@ +package org.apache.hudi.client.bootstrap; + +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.SparkAdapterSupport$; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.hadoop.CachingPath; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; +import org.apache.spark.sql.internal.SQLConf; + +public class PartitionUtils { + + + public static Object[] getPartitionFieldVals(Option partitionFields, + String partitionPath, + String bootstrapBasePath, + Schema writerSchema, + Configuration hadoopConf) { + if (!partitionFields.isPresent()) { + return new Object[0]; + } + SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil(); + return HoodieSparkUtils.parsePartitionColumnValues( + partitionFields.get(), + partitionPath, + new CachingPath(bootstrapBasePath), + AvroConversionUtils.convertAvroSchemaToStructType(writerSchema), + hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()), + sparkParsePartitionUtil, + hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 8306fbbe9d8f..42410911b8ca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -30,6 +30,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.bootstrap.PartitionUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -222,7 +223,15 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = Option.empty(); + Object[] partitionValues = new Object[0]; + if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) { + partitionFields = getMetaClient().getTableConfig().getPartitionFields(); + partitionValues = PartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + getMetaClient().getTableConfig().getBootstrapBasePath().orElse(null), + upsertHandle.getWriterSchema(), getHadoopConf()); + } + HoodieMergeHelper.newInstance().runMerge(this, upsertHandle, partitionFields, partitionValues); } // TODO(vc): This needs to be revisited diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 47f031e0ac7d..c835338a8ca1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -18,7 +18,11 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.bootstrap.PartitionUtils; import org.apache.hudi.client.utils.SparkValidatorUtils; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; @@ -43,6 +47,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; @@ -58,6 +63,8 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -366,7 +373,18 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = Option.empty(); + Object[] partitionValues = new Object[0]; + if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) { + partitionFields = table.getMetaClient().getTableConfig().getPartitionFields(); + partitionValues = PartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + table.getMetaClient().getTableConfig().getBootstrapBasePath().orElse(null), + upsertHandle.getWriterSchema(), table.getHadoopConf()); + } + + + HoodieMergeHelper.newInstance().runMerge(table, upsertHandle, partitionFields , partitionValues); } // TODO(vc): This needs to be revisited diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java index 7184312012ad..a7c2af6a4ffe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java @@ -64,35 +64,21 @@ public Set filterRowKeys(Set candidateRowKeys) { public ClosableIterator> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { ClosableIterator> skeletonIterator = skeletonFileReader.getRecordIterator(readerSchema, requestedSchema); ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema); - return new ClosableIterator>() { + return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, readerSchema, partitionFields, partitionValues) { @Override - public void close() { - skeletonIterator.close(); - dataFileIterator.close(); - } - - @Override - public boolean hasNext() { - return skeletonIterator.hasNext() && dataFileIterator.hasNext(); + protected void setThePartitionField(int position, Object fieldValue, T row) { + setPartitionField(position, fieldValue, row); } + }; + } + public ClosableIterator> getRecordIterator(Schema schema) throws IOException { + ClosableIterator> skeletonIterator = skeletonFileReader.getRecordIterator(schema); + ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(dataFileReader.getSchema()); + return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, schema, partitionFields, partitionValues) { @Override - public HoodieRecord next() { - HoodieRecord dataRecord = dataFileIterator.next(); - HoodieRecord skeletonRecord = skeletonIterator.next(); - HoodieRecord ret = dataRecord.prependMetaFields(readerSchema, readerSchema, - new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD)) - .setCommitSeqno(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)) - .setRecordKey(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.RECORD_KEY_METADATA_FIELD)) - .setPartitionPath(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.PARTITION_PATH_METADATA_FIELD)) - .setFileName(skeletonRecord.getRecordKey(readerSchema, HoodieRecord.FILENAME_METADATA_FIELD)), null); - if (partitionFields.isPresent()) { - for (int i = 0; i < partitionValues.length; i++) { - int position = readerSchema.getField(partitionFields.get()[i]).pos(); - setPartitionField(position, partitionValues[i], ret.getData()); - } - } - return ret; + protected void setThePartitionField(int position, Object fieldValue, T row) { + setPartitionField(position, fieldValue, row); } }; } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java new file mode 100644 index 000000000000..f9d8891f4727 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.MetadataValues; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; + +import org.apache.avro.Schema; + +public abstract class HoodieBootstrapRecordIterator implements ClosableIterator> { + + protected ClosableIterator> skeletonIterator; + protected ClosableIterator> dataFileIterator; + private final Option partitionFields; + private final Object[] partitionValues; + + protected Schema schema; + + + public HoodieBootstrapRecordIterator(ClosableIterator> skeletonIterator, + ClosableIterator> dataFileIterator, + Schema schema, + Option partitionFields, + Object[] partitionValues) { + this.skeletonIterator = skeletonIterator; + this.dataFileIterator = dataFileIterator; + this.schema = schema; + this.partitionFields = partitionFields; + this.partitionValues = partitionValues; + } + + @Override + public void close() { + + } + + @Override + public boolean hasNext() { + return skeletonIterator.hasNext() && dataFileIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + HoodieRecord dataRecord = dataFileIterator.next(); + HoodieRecord skeletonRecord = skeletonIterator.next(); + HoodieRecord ret = dataRecord.prependMetaFields(schema, schema, + new MetadataValues().setCommitTime(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_TIME_METADATA_FIELD)) + .setCommitSeqno(skeletonRecord.getRecordKey(schema, HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)) + .setRecordKey(skeletonRecord.getRecordKey(schema, HoodieRecord.RECORD_KEY_METADATA_FIELD)) + .setPartitionPath(skeletonRecord.getRecordKey(schema, HoodieRecord.PARTITION_PATH_METADATA_FIELD)) + .setFileName(skeletonRecord.getRecordKey(schema, HoodieRecord.FILENAME_METADATA_FIELD)), null); + if (partitionFields.isPresent()) { + for (int i = 0; i < partitionValues.length; i++) { + int position = schema.getField(partitionFields.get()[i]).pos(); + setThePartitionField(position, partitionValues[i], ret.getData()); + } + } + return ret; + } + + protected abstract void setThePartitionField(int position, Object fieldValue, T row); +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java new file mode 100644 index 000000000000..f8c054cdbbb1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector; +import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.testutils.HoodieSparkClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests different layouts for bootstrap base path + */ +@Tag("functional") +public class TestBootstrapRead extends HoodieSparkClientTestBase { + + @TempDir + public java.nio.file.Path tmpFolder; + protected String bootstrapBasePath = null; + protected String bootstrapTargetPath = null; + protected String hudiBasePath = null; + + protected static int nInserts = 100; + protected static int nUpdates = 20; + protected static String[] dashPartitionPaths = {"2016-03-15", "2015-03-16", "2015-03-17"}; + protected static String[] slashPartitionPaths = {"2016/03/15", "2015/03/16", "2015/03/17"}; + protected String bootstrapType; + protected Boolean dashPartitions; + protected String tableType; + + protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state", "partition_path"}; + + @BeforeEach + public void setUp() throws Exception { + bootstrapBasePath = tmpFolder.toAbsolutePath() + "/bootstrapBasePath"; + hudiBasePath = tmpFolder.toAbsolutePath() + "/hudiBasePath"; + bootstrapTargetPath = tmpFolder.toAbsolutePath() + "/bootstrapTargetPath"; + initSparkContexts(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupClients(); + cleanupTestDataGenerator(); + } + + private static Stream testArgs() { + Stream.Builder b = Stream.builder(); + Boolean[] dashPartitions = {true}; + String[] tableType = {"COPY_ON_WRITE"}; + String[] bootstrapType = {"full", "metadata", "mixed"}; + + for (String tt : tableType) { + for (Boolean dash : dashPartitions) { + for (String bt : bootstrapType) { + b.add(Arguments.of(bt, dash, tt)); + } + } + } + return b.build(); + } + + @ParameterizedTest + @MethodSource("testArgs") + public void runTests(String bootstrapType,Boolean dashPartitions, String tableType) { + this.bootstrapType = bootstrapType; + this.dashPartitions = dashPartitions; + this.tableType = tableType; + setupDirs(); + + //do bootstrap + Map options = setBootstrapOptions(); + Dataset bootstrapDf = sparkSession.emptyDataFrame(); + bootstrapDf.write().format("hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(bootstrapTargetPath); + compareTables(); + + //do upserts + options = basicOptions(); + doUpdate(options, "001"); + compareTables(); + + doUpdate(options, "002"); + compareTables(); + } + + private Map basicOptions() { + Map options = new HashMap<>(); + options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); + options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); + options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + if (tableType.equals("MERGE_ON_READ")) { + options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + } + options.put(HoodieWriteConfig.TBL_NAME.key(), "test"); + return options; + } + + private Map setBootstrapOptions() { + Map options = basicOptions(); + options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); + options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + switch (bootstrapType) { + case "metadata": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), MetadataOnlyBootstrapModeSelector.class.getName()); + break; + case "full": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), FullRecordBootstrapModeSelector.class.getName()); + break; + case "mixed": + options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), BootstrapRegexModeSelector.class.getName()); + if (dashPartitions) { + options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), "partition_path=2015-03-1[5-7]"); + } else { + options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), "partition_path=2015%2F03%2F1[5-7]"); + } + break; + default: + throw new RuntimeException(); + } + return options; + } + + protected void doUpdate(Map options, String instantTime) { + Dataset updates = generateTestUpdates(instantTime); + String nCompactCommits = "3"; + updates.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(hudiBasePath); + if (bootstrapType.equals("mixed")) { + //mixed tables have a commit for each of the metadata and full bootstrap modes + nCompactCommits = "4"; + } + updates.write().format("hudi") + .options(options) + .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), nCompactCommits) + .mode(SaveMode.Append) + .save(bootstrapTargetPath); + } + + protected void compareTables() { + Map readOpts = new HashMap<>(); + if (tableType.equals("MERGE_ON_READ")) { + //Bootstrap MOR currently only has read optimized queries implemented + readOpts.put(DataSourceReadOptions.QUERY_TYPE().key(),DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL()); + } + Dataset hudiDf = sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath); + Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); + compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); + compareDf(hudiDf.select("_row_key","partition_path"), bootstrapDf.select("_row_key","partition_path")); + } + + protected void compareDf(Dataset df1, Dataset df2) { + assertEquals(0, df1.except(df2).count()); + assertEquals(0, df2.except(df1).count()); + } + + protected void setupDirs() { + dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths : slashPartitionPaths); + Dataset inserts = generateTestInserts(); + + inserts.write().format("hudi") + .options(basicOptions()) + .mode(SaveMode.Overwrite) + .save(hudiBasePath); + + inserts.write().partitionBy("partition_path").save(bootstrapBasePath); + } + + public Dataset generateTestInserts() { + List records = dataGen.generateInserts("000", nInserts).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return sparkSession.read().json(rdd); + } + + public Dataset generateTestUpdates(String instantTime) { + try { + List records = dataGen.generateUpdates(instantTime, nUpdates).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD rdd = jsc.parallelize(records); + return sparkSession.read().json(rdd); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file From 66dd335158e2800359039917a9c1690450fa9c27 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 10 May 2023 09:02:05 -0600 Subject: [PATCH 02/10] fixed failing test and resolved checkstyle --- .../action/commit/HoodieMergeHelper.java | 1 - .../hudi/client/bootstrap/PartitionUtils.java | 20 +++++++++++++++++-- .../commit/BaseSparkCommitActionExecutor.java | 8 +------- .../io/storage/HoodieBootstrapFileReader.java | 1 - .../HoodieBootstrapRecordIterator.java | 1 - .../common/testutils/HoodieTestUtils.java | 9 ++++++++- .../apache/hudi/functional/TestBootstrap.java | 9 +++++---- 7 files changed, 32 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 2df98f6dadc6..c356b0584fdf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.action.commit; -import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java index c54819551cb9..ce70245a98fe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java @@ -1,10 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.client.bootstrap; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.CachingPath; import org.apache.avro.Schema; @@ -14,7 +31,6 @@ public class PartitionUtils { - public static Object[] getPartitionFieldVals(Option partitionFields, String partitionPath, String bootstrapBasePath, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index c835338a8ca1..ce928cf8471a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -18,9 +18,6 @@ package org.apache.hudi.table.action.commit; -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.HoodieSparkUtils; -import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.bootstrap.PartitionUtils; import org.apache.hudi.client.utils.SparkValidatorUtils; @@ -47,7 +44,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.SparkLazyInsertIterable; -import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.HoodieMergeHandle; @@ -63,8 +59,6 @@ import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -384,7 +378,7 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle implements ClosableIterat protected Schema schema; - public HoodieBootstrapRecordIterator(ClosableIterator> skeletonIterator, ClosableIterator> dataFileIterator, Schema schema, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index d3c1de56773b..0677eb154176 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -71,13 +71,20 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT return init(getDefaultHadoopConf(), basePath, tableType, properties); } - public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException { + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable, String keyGenerator) throws IOException { Properties props = new Properties(); props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable); + if (keyGenerator != null) { + props.put("hoodie.datasource.write.keygenerator.class", keyGenerator); + } return init(getDefaultHadoopConf(), basePath, tableType, props); } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException { + return init(basePath, tableType, bootstrapBasePath, bootstrapIndexEnable, null); + } + public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseFileFormat) throws IOException { return init(getDefaultHadoopConf(), basePath, HoodieTableType.COPY_ON_WRITE, baseFileFormat); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index cb90f2d27055..b6cda7c63c54 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -194,15 +194,15 @@ private enum EffectiveMode { private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception { + String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() + : NonpartitionedKeyGenerator.class.getCanonicalName(); if (deltaCommit) { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, true, keyGeneratorClass); } else { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, true, keyGeneratorClass); } int totalRecords = 100; - String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() - : NonpartitionedKeyGenerator.class.getCanonicalName(); final String bootstrapModeSelectorClass; final String bootstrapCommitInstantTs; final boolean checkNumRawFiles; @@ -242,6 +242,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec HoodieWriteConfig config = getConfigBuilder(schema.toString()) .withAutoCommit(true) .withSchema(schema.toString()) + .withKeyGenerator(keyGeneratorClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) .build()) From 495d4609426499f684ff058938083e564ba72473 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 10 May 2023 09:18:09 -0600 Subject: [PATCH 03/10] added tests for nonpartitioned table --- .../hudi/functional/TestBootstrapRead.java | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index f8c054cdbbb1..b523ba633e17 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.testutils.HoodieSparkClientTestBase; @@ -71,6 +72,7 @@ public class TestBootstrapRead extends HoodieSparkClientTestBase { protected String bootstrapType; protected Boolean dashPartitions; protected String tableType; + protected Integer nPartitions; protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state", "partition_path"}; @@ -94,11 +96,14 @@ private static Stream testArgs() { Boolean[] dashPartitions = {true}; String[] tableType = {"COPY_ON_WRITE"}; String[] bootstrapType = {"full", "metadata", "mixed"}; + Integer[] nPartitions = {0, 1}; for (String tt : tableType) { for (Boolean dash : dashPartitions) { for (String bt : bootstrapType) { - b.add(Arguments.of(bt, dash, tt)); + for (Integer n : nPartitions) { + b.add(Arguments.of(bt, dash, tt, n)); + } } } } @@ -107,10 +112,11 @@ private static Stream testArgs() { @ParameterizedTest @MethodSource("testArgs") - public void runTests(String bootstrapType,Boolean dashPartitions, String tableType) { + public void runTests(String bootstrapType,Boolean dashPartitions, String tableType, Integer nPartitions) { this.bootstrapType = bootstrapType; this.dashPartitions = dashPartitions; this.tableType = tableType; + this.nPartitions = nPartitions; setupDirs(); //do bootstrap @@ -136,7 +142,9 @@ private Map basicOptions() { options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); - options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + if (nPartitions > 0) { + options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + } options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); if (tableType.equals("MERGE_ON_READ")) { options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); @@ -149,7 +157,12 @@ private Map setBootstrapOptions() { Map options = basicOptions(); options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + if (nPartitions == 0) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); + } else { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + } + switch (bootstrapType) { case "metadata": options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), MetadataOnlyBootstrapModeSelector.class.getName()); @@ -216,7 +229,17 @@ protected void setupDirs() { .mode(SaveMode.Overwrite) .save(hudiBasePath); - inserts.write().partitionBy("partition_path").save(bootstrapBasePath); + switch (nPartitions) { + case 0: + inserts.write().save(bootstrapBasePath); + break; + case 1: + inserts.write().partitionBy("partition_path").save(bootstrapBasePath); + break; + default: + throw new RuntimeException(); + } + } public Dataset generateTestInserts() { From 1ea8180467bee682754a8d828fa974ee0895a036 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 10 May 2023 23:01:27 -0600 Subject: [PATCH 04/10] add multiple partition tests, timeline server causes some errors so it is disabled --- .../HoodieBootstrapRecordIterator.java | 6 +- .../hudi/functional/TestBootstrapRead.java | 92 +++++++++++++------ 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java index a639dbf6f950..b3ccf573ffe0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -25,6 +25,8 @@ import org.apache.avro.Schema; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public abstract class HoodieBootstrapRecordIterator implements ClosableIterator> { protected ClosableIterator> skeletonIterator; @@ -53,7 +55,9 @@ public void close() { @Override public boolean hasNext() { - return skeletonIterator.hasNext() && dataFileIterator.hasNext(); + checkState(skeletonIterator.hasNext() == dataFileIterator.hasNext()); + return + skeletonIterator.hasNext(); } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index b523ba633e17..ab02cd5ba1ed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.testutils.HoodieSparkClientTestBase; @@ -35,6 +36,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.functions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -44,6 +46,7 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,7 +77,8 @@ public class TestBootstrapRead extends HoodieSparkClientTestBase { protected String tableType; protected Integer nPartitions; - protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state", "partition_path"}; + protected String[] partitionCols; + protected static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_file_name", "city_to_state"}; @BeforeEach public void setUp() throws Exception { @@ -94,19 +98,23 @@ public void tearDown() throws IOException { private static Stream testArgs() { Stream.Builder b = Stream.builder(); Boolean[] dashPartitions = {true}; - String[] tableType = {"COPY_ON_WRITE"}; + String[] tableType = {"COPY_ON_WRITE", "MERGE_ON_READ"}; String[] bootstrapType = {"full", "metadata", "mixed"}; - Integer[] nPartitions = {0, 1}; + Integer[] nPartitions = {0, 1, 2}; for (String tt : tableType) { for (Boolean dash : dashPartitions) { for (String bt : bootstrapType) { for (Integer n : nPartitions) { - b.add(Arguments.of(bt, dash, tt, n)); + //can't be mixed bootstrap if it's nonpartitioned + if (!bt.equals("mixed") || n > 0) { + b.add(Arguments.of(bt, dash, tt, n)); + } } } } } + return b.build(); } @@ -142,8 +150,16 @@ private Map basicOptions() { options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); - if (nPartitions > 0) { - options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + options.put("hoodie.embed.timeline.server", "false"); + if (nPartitions == 0) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); + } else { + options.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), String.join(",", partitionCols)); + if (nPartitions == 1) { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); + } else { + options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), ComplexKeyGenerator.class.getName()); + } } options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); if (tableType.equals("MERGE_ON_READ")) { @@ -157,11 +173,6 @@ private Map setBootstrapOptions() { Map options = basicOptions(); options.put(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()); options.put(HoodieBootstrapConfig.BASE_PATH.key(), bootstrapBasePath); - if (nPartitions == 0) { - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); - } else { - options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName()); - } switch (bootstrapType) { case "metadata": @@ -172,11 +183,16 @@ private Map setBootstrapOptions() { break; case "mixed": options.put(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key(), BootstrapRegexModeSelector.class.getName()); + String regexPattern; if (dashPartitions) { - options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), "partition_path=2015-03-1[5-7]"); + regexPattern = "partition_path=2015-03-1[5-7]"; } else { - options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), "partition_path=2015%2F03%2F1[5-7]"); + regexPattern = "partition_path=2015%2F03%2F1[5-7]"; } + if (nPartitions > 1) { + regexPattern = regexPattern + "\\/.*"; + } + options.put(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_PATTERN.key(), regexPattern); break; default: throw new RuntimeException(); @@ -211,8 +227,12 @@ protected void compareTables() { } Dataset hudiDf = sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath); Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); - compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); - compareDf(hudiDf.select("_row_key","partition_path"), bootstrapDf.select("_row_key","partition_path")); + if (nPartitions == 0) { + compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); + return; + } + compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols)); + compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); } protected void compareDf(Dataset df1, Dataset df2) { @@ -223,30 +243,28 @@ protected void compareDf(Dataset df1, Dataset df2) { protected void setupDirs() { dataGen = new HoodieTestDataGenerator(dashPartitions ? dashPartitionPaths : slashPartitionPaths); Dataset inserts = generateTestInserts(); + if (nPartitions > 0) { + partitionCols = new String[nPartitions]; + partitionCols[0] = "partition_path"; + for (int i = 1; i < partitionCols.length; i++) { + partitionCols[i] = "partpath" + (i + 1); + } + inserts.write().partitionBy(partitionCols).save(bootstrapBasePath); + } else { + inserts.write().save(bootstrapBasePath); + } inserts.write().format("hudi") .options(basicOptions()) .mode(SaveMode.Overwrite) .save(hudiBasePath); - - switch (nPartitions) { - case 0: - inserts.write().save(bootstrapBasePath); - break; - case 1: - inserts.write().partitionBy("partition_path").save(bootstrapBasePath); - break; - default: - throw new RuntimeException(); - } - } public Dataset generateTestInserts() { List records = dataGen.generateInserts("000", nInserts).stream() .map(r -> recordToString(r).get()).collect(Collectors.toList()); JavaRDD rdd = jsc.parallelize(records); - return sparkSession.read().json(rdd); + return addPartitionColumns(sparkSession.read().json(rdd), nPartitions); } public Dataset generateTestUpdates(String instantTime) { @@ -254,9 +272,25 @@ public Dataset generateTestUpdates(String instantTime) { List records = dataGen.generateUpdates(instantTime, nUpdates).stream() .map(r -> recordToString(r).get()).collect(Collectors.toList()); JavaRDD rdd = jsc.parallelize(records); - return sparkSession.read().json(rdd); + return addPartitionColumns(sparkSession.read().json(rdd), nPartitions); } catch (IOException e) { throw new RuntimeException(e); } } + + public static Dataset addPartitionColumns(Dataset df, Integer nPartitions) { + if (nPartitions < 2) { + return df; + } + for (int i = 2; i <= nPartitions; i++) { + df = applyPartition(df, i); + } + return df; + } + + private static Dataset applyPartition(Dataset df, Integer n) { + return df.withColumn("partpath" + n, + functions.md5(functions.concat_ws( "," + n + ",", df.col("partition_path"), + functions.hash(df.col("_row_key")).mod(n)))); + } } \ No newline at end of file From bfa1909d85b174cb466d205ee4ec2af09be1850d Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 10 May 2023 23:14:15 -0600 Subject: [PATCH 05/10] add todo --- .../test/java/org/apache/hudi/functional/TestBootstrapRead.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index ab02cd5ba1ed..b55d16bf066f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -150,6 +150,7 @@ private Map basicOptions() { options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + //TODO: enable timeline server with [HUDI-6201] options.put("hoodie.embed.timeline.server", "false"); if (nPartitions == 0) { options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); From 840cfab05cacd7a4862b2ad9a1983af2953819a1 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 10 May 2023 23:36:14 -0600 Subject: [PATCH 06/10] fix checkstyle --- .../MultipleSparkJobExecutionStrategy.java | 17 ++--------------- .../hudi/functional/TestBootstrapRead.java | 8 ++++---- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index bf05e43fd900..008170aa9546 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,14 +18,13 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.bootstrap.PartitionUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.common.config.SerializableConfiguration; @@ -54,7 +53,6 @@ import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner; -import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -72,8 +70,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.sources.BaseRelation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -379,16 +375,7 @@ private HoodieFileReader getBaseOrBootstrapFileReader(SerializableConfiguration if (partitionFields.isPresent()) { int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1; String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/")); - CachingPath bootstrapCachingPath = new CachingPath(bootstrapBasePath); - SparkParsePartitionUtil sparkParsePartitionUtil = SparkAdapterSupport$.MODULE$.sparkAdapter().getSparkParsePartitionUtil(); - partitionValues = HoodieSparkUtils.parsePartitionColumnValues( - partitionFields.get(), - partitionFilePath, - bootstrapCachingPath, - AvroConversionUtils.convertAvroSchemaToStructType(baseFileReader.getSchema()), - hadoopConf.get().get("timeZone", SQLConf.get().sessionLocalTimeZone()), - sparkParsePartitionUtil, - hadoopConf.get().getBoolean("spark.sql.sources.validatePartitionColumns", true)); + partitionValues = PartitionUtils.getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get()); } baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index b55d16bf066f..c35584ae3188 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -46,7 +46,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -290,8 +289,9 @@ public static Dataset addPartitionColumns(Dataset df, Integer nPartiti } private static Dataset applyPartition(Dataset df, Integer n) { - return df.withColumn("partpath" + n, - functions.md5(functions.concat_ws( "," + n + ",", df.col("partition_path"), - functions.hash(df.col("_row_key")).mod(n)))); + return df.withColumn("partpath" + n, + functions.md5(functions.concat_ws("," + n + ",", + df.col("partition_path"), + functions.hash(df.col("_row_key")).mod(n)))); } } \ No newline at end of file From 1b2f28447ac507b35f82a0534ebd958a8fd8980d Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 11 May 2023 12:04:18 -0600 Subject: [PATCH 07/10] added lock to bootstrap file map and fixed failing test --- .../table/view/HoodieTableFileSystemView.java | 69 ++++++++++++++----- .../view/SpillableMapBasedFileSystemView.java | 7 +- .../HoodieBootstrapRecordIterator.java | 3 +- .../common/testutils/HoodieTestUtils.java | 4 +- .../hudi/functional/TestBootstrapRead.java | 3 - 5 files changed, 62 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index eb23c1ca1c1b..8470bf103b1e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -70,6 +71,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem /** * PartitionPath + File-Id to bootstrap base File (Index Only bootstrapped). */ + protected ReentrantReadWriteLock bootstrapMapLock; protected Map fgIdToBootstrapBaseFile; /** @@ -110,6 +112,7 @@ public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimelin @Override public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { this.partitionToFileGroupsMap = createPartitionToFileGroups(); + this.bootstrapMapLock = new ReentrantReadWriteLock(); super.init(metaClient, visibleActiveTimeline); } @@ -326,44 +329,76 @@ Stream> fetchPendingLogCompactionOperations() @Override protected boolean isBootstrapBaseFilePresentForFileId(HoodieFileGroupId fgId) { - return fgIdToBootstrapBaseFile.containsKey(fgId); + bootstrapMapLock.readLock().lock(); + try { + return fgIdToBootstrapBaseFile.containsKey(fgId); + } finally { + bootstrapMapLock.readLock().unlock(); + } + } @Override void resetBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { // Build fileId to bootstrap Data File - this.fgIdToBootstrapBaseFile = createFileIdToBootstrapBaseFileMap(bootstrapBaseFileStream - .collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, x -> x))); + bootstrapMapLock.writeLock().lock(); + try { + this.fgIdToBootstrapBaseFile = createFileIdToBootstrapBaseFileMap(bootstrapBaseFileStream + .collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, x -> x))); + } finally { + bootstrapMapLock.writeLock().unlock(); + } } @Override void addBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { - bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { - ValidationUtils.checkArgument(!fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), - "Duplicate FileGroupId found in bootstrap base file mapping. FgId :" - + bootstrapBaseFile.getFileGroupId()); - fgIdToBootstrapBaseFile.put(bootstrapBaseFile.getFileGroupId(), bootstrapBaseFile); - }); + bootstrapMapLock.writeLock().lock(); + try { + bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { + ValidationUtils.checkArgument(!fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), + "Duplicate FileGroupId found in bootstrap base file mapping. FgId :" + + bootstrapBaseFile.getFileGroupId()); + fgIdToBootstrapBaseFile.put(bootstrapBaseFile.getFileGroupId(), bootstrapBaseFile); + }); + } finally { + bootstrapMapLock.writeLock().unlock(); + } } @Override void removeBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { - bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { - ValidationUtils.checkArgument(fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), - "Trying to remove a FileGroupId which is not found in bootstrap base file mapping. FgId :" - + bootstrapBaseFile.getFileGroupId()); - fgIdToBootstrapBaseFile.remove(bootstrapBaseFile.getFileGroupId()); - }); + bootstrapMapLock.writeLock().lock(); + try { + bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { + ValidationUtils.checkArgument(fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), + "Trying to remove a FileGroupId which is not found in bootstrap base file mapping. FgId :" + + bootstrapBaseFile.getFileGroupId()); + fgIdToBootstrapBaseFile.remove(bootstrapBaseFile.getFileGroupId()); + }); + } finally { + bootstrapMapLock.writeLock().unlock(); + } + } @Override protected Option getBootstrapBaseFile(HoodieFileGroupId fileGroupId) { - return Option.ofNullable(fgIdToBootstrapBaseFile.get(fileGroupId)); + bootstrapMapLock.readLock().lock(); + try { + return Option.ofNullable(fgIdToBootstrapBaseFile.get(fileGroupId)); + } finally { + bootstrapMapLock.readLock().unlock(); + } } @Override Stream fetchBootstrapBaseFiles() { - return fgIdToBootstrapBaseFile.values().stream(); + bootstrapMapLock.readLock().lock(); + try { + return fgIdToBootstrapBaseFile.values().stream(); + } finally { + bootstrapMapLock.readLock().unlock(); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 75d29870a5a8..f9063b768d7d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -195,7 +195,12 @@ Stream> fetchPendingLogCompactionOperations() @Override Stream fetchBootstrapBaseFiles() { - return ((ExternalSpillableMap) fgIdToBootstrapBaseFile).valueStream(); + bootstrapMapLock.readLock().lock(); + try { + return ((ExternalSpillableMap) fgIdToBootstrapBaseFile).valueStream(); + } finally { + bootstrapMapLock.readLock().unlock(); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java index b3ccf573ffe0..69089da500ec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -56,8 +56,7 @@ public void close() { @Override public boolean hasNext() { checkState(skeletonIterator.hasNext() == dataFileIterator.hasNext()); - return - skeletonIterator.hasNext(); + return skeletonIterator.hasNext(); } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 0677eb154176..4751dbf51620 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -77,6 +77,7 @@ public static HoodieTableMetaClient init(String basePath, HoodieTableType tableT props.put(HoodieTableConfig.BOOTSTRAP_INDEX_ENABLE.key(), bootstrapIndexEnable); if (keyGenerator != null) { props.put("hoodie.datasource.write.keygenerator.class", keyGenerator); + props.put("hoodie.datasource.write.partitionpath.field", "datestr"); } return init(getDefaultHadoopConf(), basePath, tableType, props); } @@ -147,7 +148,8 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa .setPayloadClass(HoodieAvroPayload.class); String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class"); - if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + && !properties.containsKey("hoodie.datasource.write.partitionpath.field")) { builder.setPartitionFields("some_nonexistent_field"); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index c35584ae3188..63db95bc88de 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -113,7 +113,6 @@ private static Stream testArgs() { } } } - return b.build(); } @@ -149,8 +148,6 @@ private Map basicOptions() { options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); - //TODO: enable timeline server with [HUDI-6201] - options.put("hoodie.embed.timeline.server", "false"); if (nPartitions == 0) { options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); } else { From cc420c8790dc736da431714aa78750be1c502a9d Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 15 May 2023 11:48:35 -0400 Subject: [PATCH 08/10] got rid of timeline changes because that should be in a different pr --- .../table/view/HoodieTableFileSystemView.java | 69 +++++-------------- .../view/SpillableMapBasedFileSystemView.java | 7 +- .../hudi/functional/TestBootstrapRead.java | 3 + 3 files changed, 21 insertions(+), 58 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index 8470bf103b1e..eb23c1ca1c1b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -41,7 +41,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -71,7 +70,6 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem /** * PartitionPath + File-Id to bootstrap base File (Index Only bootstrapped). */ - protected ReentrantReadWriteLock bootstrapMapLock; protected Map fgIdToBootstrapBaseFile; /** @@ -112,7 +110,6 @@ public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimelin @Override public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { this.partitionToFileGroupsMap = createPartitionToFileGroups(); - this.bootstrapMapLock = new ReentrantReadWriteLock(); super.init(metaClient, visibleActiveTimeline); } @@ -329,76 +326,44 @@ Stream> fetchPendingLogCompactionOperations() @Override protected boolean isBootstrapBaseFilePresentForFileId(HoodieFileGroupId fgId) { - bootstrapMapLock.readLock().lock(); - try { - return fgIdToBootstrapBaseFile.containsKey(fgId); - } finally { - bootstrapMapLock.readLock().unlock(); - } - + return fgIdToBootstrapBaseFile.containsKey(fgId); } @Override void resetBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { // Build fileId to bootstrap Data File - bootstrapMapLock.writeLock().lock(); - try { - this.fgIdToBootstrapBaseFile = createFileIdToBootstrapBaseFileMap(bootstrapBaseFileStream - .collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, x -> x))); - } finally { - bootstrapMapLock.writeLock().unlock(); - } + this.fgIdToBootstrapBaseFile = createFileIdToBootstrapBaseFileMap(bootstrapBaseFileStream + .collect(Collectors.toMap(BootstrapBaseFileMapping::getFileGroupId, x -> x))); } @Override void addBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { - bootstrapMapLock.writeLock().lock(); - try { - bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { - ValidationUtils.checkArgument(!fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), - "Duplicate FileGroupId found in bootstrap base file mapping. FgId :" - + bootstrapBaseFile.getFileGroupId()); - fgIdToBootstrapBaseFile.put(bootstrapBaseFile.getFileGroupId(), bootstrapBaseFile); - }); - } finally { - bootstrapMapLock.writeLock().unlock(); - } + bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { + ValidationUtils.checkArgument(!fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), + "Duplicate FileGroupId found in bootstrap base file mapping. FgId :" + + bootstrapBaseFile.getFileGroupId()); + fgIdToBootstrapBaseFile.put(bootstrapBaseFile.getFileGroupId(), bootstrapBaseFile); + }); } @Override void removeBootstrapBaseFileMapping(Stream bootstrapBaseFileStream) { - bootstrapMapLock.writeLock().lock(); - try { - bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { - ValidationUtils.checkArgument(fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), - "Trying to remove a FileGroupId which is not found in bootstrap base file mapping. FgId :" - + bootstrapBaseFile.getFileGroupId()); - fgIdToBootstrapBaseFile.remove(bootstrapBaseFile.getFileGroupId()); - }); - } finally { - bootstrapMapLock.writeLock().unlock(); - } - + bootstrapBaseFileStream.forEach(bootstrapBaseFile -> { + ValidationUtils.checkArgument(fgIdToBootstrapBaseFile.containsKey(bootstrapBaseFile.getFileGroupId()), + "Trying to remove a FileGroupId which is not found in bootstrap base file mapping. FgId :" + + bootstrapBaseFile.getFileGroupId()); + fgIdToBootstrapBaseFile.remove(bootstrapBaseFile.getFileGroupId()); + }); } @Override protected Option getBootstrapBaseFile(HoodieFileGroupId fileGroupId) { - bootstrapMapLock.readLock().lock(); - try { - return Option.ofNullable(fgIdToBootstrapBaseFile.get(fileGroupId)); - } finally { - bootstrapMapLock.readLock().unlock(); - } + return Option.ofNullable(fgIdToBootstrapBaseFile.get(fileGroupId)); } @Override Stream fetchBootstrapBaseFiles() { - bootstrapMapLock.readLock().lock(); - try { - return fgIdToBootstrapBaseFile.values().stream(); - } finally { - bootstrapMapLock.readLock().unlock(); - } + return fgIdToBootstrapBaseFile.values().stream(); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index f9063b768d7d..75d29870a5a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -195,12 +195,7 @@ Stream> fetchPendingLogCompactionOperations() @Override Stream fetchBootstrapBaseFiles() { - bootstrapMapLock.readLock().lock(); - try { - return ((ExternalSpillableMap) fgIdToBootstrapBaseFile).valueStream(); - } finally { - bootstrapMapLock.readLock().unlock(); - } + return ((ExternalSpillableMap) fgIdToBootstrapBaseFile).valueStream(); } @Override diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index 63db95bc88de..2fb4449cb896 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -96,6 +96,7 @@ public void tearDown() throws IOException { private static Stream testArgs() { Stream.Builder b = Stream.builder(); + //TODO: add dash partitions false with [HUDI-4944] Boolean[] dashPartitions = {true}; String[] tableType = {"COPY_ON_WRITE", "MERGE_ON_READ"}; String[] bootstrapType = {"full", "metadata", "mixed"}; @@ -148,6 +149,8 @@ private Map basicOptions() { options.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType); options.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + //TODO: enable timeline server with [HUDI-6201] + options.put("hoodie.embed.timeline.server", "false"); if (nPartitions == 0) { options.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getName()); } else { From c20d2780e542b92fbf95a2a3fb2ab94512943780 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 15 May 2023 16:34:55 -0400 Subject: [PATCH 09/10] addressed pr comments --- .../run/strategy/MultipleSparkJobExecutionStrategy.java | 4 ++-- .../SparkPartitionUtils.java} | 8 ++++---- .../apache/hudi/table/HoodieSparkCopyOnWriteTable.java | 6 +++--- .../action/commit/BaseSparkCommitActionExecutor.java | 6 +++--- .../org/apache/hudi/common/testutils/HoodieTestUtils.java | 6 +++++- .../org/apache/hudi/functional/TestBootstrapRead.java | 4 ++-- .../java/org/apache/hudi/functional/TestOrcBootstrap.java | 8 ++++++-- 7 files changed, 25 insertions(+), 17 deletions(-) rename hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/{bootstrap/PartitionUtils.java => utils/SparkPartitionUtils.java} (91%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 008170aa9546..9210cd5eac61 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.bootstrap.PartitionUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; import org.apache.hudi.common.config.SerializableConfiguration; @@ -86,6 +85,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals; import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -375,7 +375,7 @@ private HoodieFileReader getBaseOrBootstrapFileReader(SerializableConfiguration if (partitionFields.isPresent()) { int startOfPartitionPath = bootstrapFilePath.indexOf(bootstrapBasePath) + bootstrapBasePath.length() + 1; String partitionFilePath = bootstrapFilePath.substring(startOfPartitionPath, bootstrapFilePath.lastIndexOf("/")); - partitionValues = PartitionUtils.getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get()); + partitionValues = getPartitionFieldVals(partitionFields, partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(), hadoopConf.get()); } baseFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java similarity index 91% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java index ce70245a98fe..6dc344ec7347 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/PartitionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.client.bootstrap; +package org.apache.hudi.client.utils; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; @@ -29,11 +29,11 @@ import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil; import org.apache.spark.sql.internal.SQLConf; -public class PartitionUtils { +public class SparkPartitionUtils { public static Object[] getPartitionFieldVals(Option partitionFields, String partitionPath, - String bootstrapBasePath, + String basePath, Schema writerSchema, Configuration hadoopConf) { if (!partitionFields.isPresent()) { @@ -43,7 +43,7 @@ public static Object[] getPartitionFieldVals(Option partitionFields, return HoodieSparkUtils.parsePartitionColumnValues( partitionFields.get(), partitionPath, - new CachingPath(bootstrapBasePath), + new CachingPath(basePath), AvroConversionUtils.convertAvroSchemaToStructType(writerSchema), hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()), sparkParsePartitionUtil, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 42410911b8ca..ba9b8841889f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -30,7 +30,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.bootstrap.PartitionUtils; +import org.apache.hudi.client.utils.SparkPartitionUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; @@ -227,8 +227,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle testArgs() { Stream.Builder b = Stream.builder(); //TODO: add dash partitions false with [HUDI-4944] - Boolean[] dashPartitions = {true}; + Boolean[] dashPartitions = {true/*,false*/}; String[] tableType = {"COPY_ON_WRITE", "MERGE_ON_READ"}; String[] bootstrapType = {"full", "metadata", "mixed"}; Integer[] nPartitions = {0, 1, 2}; @@ -294,4 +294,4 @@ private static Dataset applyPartition(Dataset df, Integer n) { df.col("partition_path"), functions.hash(df.col("_row_key")).mod(n)))); } -} \ No newline at end of file +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index b65e02402e78..80c2744efcd9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -53,6 +53,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.table.action.bootstrap.BootstrapUtils; import org.apache.hudi.testutils.HoodieSparkClientTestBase; @@ -182,11 +183,13 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec if (!HoodieSparkUtils.gteqSpark3_0()) { return; } + String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() + : NonpartitionedKeyGenerator.class.getCanonicalName(); if (deltaCommit) { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, HoodieFileFormat.ORC); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, HoodieFileFormat.ORC, keyGeneratorClass); } else { - metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, HoodieFileFormat.ORC); + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, HoodieFileFormat.ORC, keyGeneratorClass); } int totalRecords = 100; @@ -229,6 +232,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec HoodieWriteConfig config = getConfigBuilder(schema.toString(), partitioned) .withAutoCommit(true) .withSchema(schema.toString()) + .withKeyGenerator(keyGeneratorClass) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1) .build()) From 6ba0740939112a80201fdaacaa2b2a1e383d1c85 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 16 May 2023 09:26:55 -0400 Subject: [PATCH 10/10] addressed 2nd round of pr comments --- .../org/apache/hudi/io/HoodieMergeHandle.java | 19 +++++++++++++++++++ .../table/action/commit/BaseMergeHelper.java | 3 +-- .../action/commit/HoodieMergeHelper.java | 8 +++----- .../table/HoodieFlinkCopyOnWriteTable.java | 2 +- .../commit/BaseFlinkCommitActionExecutor.java | 2 +- .../table/HoodieJavaCopyOnWriteTable.java | 2 +- .../commit/BaseJavaCommitActionExecutor.java | 2 +- .../table/HoodieSparkCopyOnWriteTable.java | 10 +++++----- .../commit/BaseSparkCommitActionExecutor.java | 12 +++++------- .../io/storage/HoodieBootstrapFileReader.java | 4 ++-- .../HoodieBootstrapRecordIterator.java | 4 ++-- 11 files changed, 41 insertions(+), 27 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 073a0e0aad1d..d83237cb9253 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -116,6 +116,9 @@ public class HoodieMergeHandle extends HoodieWriteHandle protected Option keyGeneratorOpt; private HoodieBaseFile baseFileToMerge; + protected Option partitionFields = Option.empty(); + protected Object[] partitionValues = new Object[0]; + public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier, Option keyGeneratorOpt) { @@ -476,4 +479,20 @@ public IOType getIOType() { public HoodieBaseFile baseFileForMerge() { return baseFileToMerge; } + + public void setPartitionFields(Option partitionFields) { + this.partitionFields = partitionFields; + } + + public Option getPartitionFields() { + return this.partitionFields; + } + + public void setPartitionValues(Object[] partitionValues) { + this.partitionValues = partitionValues; + } + + public Object[] getPartitionValues() { + return this.partitionValues; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index f3deed073b6c..17b8620da63f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -19,7 +19,6 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.HoodieConsumer; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; @@ -37,7 +36,7 @@ public abstract class BaseMergeHelper { * @param upsertHandle Merge Handle * @throws IOException in case of error */ - public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle, Option partitionFields, Object[] partitionValues) throws IOException; + public abstract void runMerge(HoodieTable table, HoodieMergeHandle upsertHandle) throws IOException; /** * Consumer that dequeues records from queue and sends to Merge Handle. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index c356b0584fdf..b7668a347969 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -75,7 +75,7 @@ public static HoodieMergeHelper newInstance() { @Override public void runMerge(HoodieTable table, - HoodieMergeHandle mergeHandle, Option partitionFields, Object[] partitionValues) throws IOException { + HoodieMergeHandle mergeHandle) throws IOException { HoodieWriteConfig writeConfig = table.getConfig(); HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); @@ -121,10 +121,8 @@ public void runMerge(HoodieTable table, bootstrapFileReader = HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader( baseFileReader, HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), - partitionFields, - partitionValues); - - + mergeHandle.getPartitionFields(), + mergeHandle.getPartitionValues()); recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields()); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); } else { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 64aa391a381e..cb046e2e91fe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -371,7 +371,7 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle> handleUpdateInternal(HoodieMergeHandle statuses = upsertHandle.writeStatuses(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index ba9b8841889f..b8be53eeaf99 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -223,15 +223,15 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = Option.empty(); - Object[] partitionValues = new Object[0]; if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) { - partitionFields = getMetaClient().getTableConfig().getPartitionFields(); - partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + Option partitionFields = getMetaClient().getTableConfig().getPartitionFields(); + Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), getMetaClient().getTableConfig().getBootstrapBasePath().get(), upsertHandle.getWriterSchema(), getHadoopConf()); + upsertHandle.setPartitionFields(partitionFields); + upsertHandle.setPartitionValues(partitionValues); } - HoodieMergeHelper.newInstance().runMerge(this, upsertHandle, partitionFields, partitionValues); + HoodieMergeHelper.newInstance().runMerge(this, upsertHandle); } // TODO(vc): This needs to be revisited diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index c474e2a9a4d4..f8ba2c8e1e4a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -367,18 +367,16 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle partitionFields = Option.empty(); - Object[] partitionValues = new Object[0]; if (upsertHandle.baseFileForMerge().getBootstrapBaseFile().isPresent()) { - partitionFields = table.getMetaClient().getTableConfig().getPartitionFields(); - partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), + Option partitionFields = table.getMetaClient().getTableConfig().getPartitionFields(); + Object[] partitionValues = SparkPartitionUtils.getPartitionFieldVals(partitionFields, upsertHandle.getPartitionPath(), table.getMetaClient().getTableConfig().getBootstrapBasePath().get(), upsertHandle.getWriterSchema(), table.getHadoopConf()); + upsertHandle.setPartitionFields(partitionFields); + upsertHandle.setPartitionValues(partitionValues); } - - HoodieMergeHelper.newInstance().runMerge(table, upsertHandle, partitionFields, partitionValues); + HoodieMergeHelper.newInstance().runMerge(table, upsertHandle); } // TODO(vc): This needs to be revisited diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java index 008a13a85c0e..4dc1a73e6ddb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java @@ -65,7 +65,7 @@ public ClosableIterator> getRecordIterator(Schema readerSchema, ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(HoodieAvroUtils.removeMetadataFields(readerSchema), requestedSchema); return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, readerSchema, partitionFields, partitionValues) { @Override - protected void setThePartitionField(int position, Object fieldValue, T row) { + protected void setPartitionPathField(int position, Object fieldValue, T row) { setPartitionField(position, fieldValue, row); } }; @@ -76,7 +76,7 @@ public ClosableIterator> getRecordIterator(Schema schema) throws ClosableIterator> dataFileIterator = dataFileReader.getRecordIterator(dataFileReader.getSchema()); return new HoodieBootstrapRecordIterator(skeletonIterator, dataFileIterator, schema, partitionFields, partitionValues) { @Override - protected void setThePartitionField(int position, Object fieldValue, T row) { + protected void setPartitionPathField(int position, Object fieldValue, T row) { setPartitionField(position, fieldValue, row); } }; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java index 69089da500ec..43f2d1ad1ad5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java @@ -72,11 +72,11 @@ public HoodieRecord next() { if (partitionFields.isPresent()) { for (int i = 0; i < partitionValues.length; i++) { int position = schema.getField(partitionFields.get()[i]).pos(); - setThePartitionField(position, partitionValues[i], ret.getData()); + setPartitionPathField(position, partitionValues[i], ret.getData()); } } return ret; } - protected abstract void setThePartitionField(int position, Object fieldValue, T row); + protected abstract void setPartitionPathField(int position, Object fieldValue, T row); }