diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 0aa9d40e28e6..8ad2bd665416 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -18,8 +18,6 @@ package org.apache.hudi; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -40,6 +38,9 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -291,31 +292,23 @@ private void doRefresh() { validate(activeTimeline, queryInstant); - if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) { - cachedAllInputFileSlices = partitionFiles.keySet().stream() - .collect(Collectors.toMap( - Function.identity(), - partitionPath -> - queryInstant.map(instant -> - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) + // NOTE: For MOR table, when the compaction is inflight, we need to not only fetch the + // latest slices, but also include the base and log files of the second-last version of + // the file slice in the same file group as the latest file slice that is under compaction. + // This logic is realized by `AbstractTableFileSystemView::getLatestMergedFileSlicesBeforeOrOn` + // API. Note that for COW table, the merging logic of two slices does not happen as there + // is no compaction, thus there is no performance impact. + cachedAllInputFileSlices = partitionFiles.keySet().stream() + .collect(Collectors.toMap( + Function.identity(), + partitionPath -> + queryInstant.map(instant -> + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get()) + ) + .orElse(fileSystemView.getLatestFileSlices(partitionPath.path)) .collect(Collectors.toList()) - ) - .orElse(Collections.emptyList()) - ) - ); - } else { - cachedAllInputFileSlices = partitionFiles.keySet().stream() - .collect(Collectors.toMap( - Function.identity(), - partitionPath -> - queryInstant.map(instant -> - fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true) - ) - .orElse(fileSystemView.getLatestFileSlices(partitionPath.path)) - .collect(Collectors.toList()) - ) - ); - } + ) + ); cachedFileSize = cachedAllInputFileSlices.values().stream() .flatMap(Collection::stream) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 87af8c668c9e..1cdcf9ed2355 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -20,19 +20,21 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.util -import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex.IndexType import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase} import org.apache.hudi.util.JFunction -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin} +import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin} import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions._ @@ -44,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource import java.util.function.Consumer +import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ /** @@ -978,4 +981,114 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin { assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0) assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20) } + + /** + * Test read-optimized query on MOR during inflight compaction. + * + * The following scenario is tested: + * Hudi timeline: + * > Deltacommit1 (DC1, completed), writing file group 1 (fg1) + * > Deltacommit2 (DC2, completed), updating fg1 + * > Compaction3 (C3, inflight), compacting fg1 + * > Deltacommit4 (DC4, completed), updating fg1 + * + * On storage, these are the data files for fg1: + * file slice v1: + * - fg1_dc1.parquet (from DC1) + * - .fg1_dc1.log (from DC2) + * file slice v2: + * - fg1_c3.parquet (from C3, inflight) + * - .fg1_c3.log (from DC4) + * + * The read-optimized query should read `fg1_dc1.parquet` only in this case. + */ + @Test + def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = { + val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table") + val precombineField = "col3" + val recordKeyField = "key" + val dataField = "age" + + val options = Map[String, String]( + DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name, + DataSourceWriteOptions.OPERATION.key -> UPSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> precombineField, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeyField, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator", + HoodieWriteConfig.TBL_NAME.key -> tableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1") + + // First batch with all inserts + // Deltacommit1 (DC1, completed), writing file group 1 (fg1) + // fg1_dc1.parquet written to storage + // For record key "0", the row is (0, 0, 1000) + val firstDf = spark.range(0, 10).toDF(recordKeyField) + .withColumn(precombineField, expr(recordKeyField)) + .withColumn(dataField, expr(recordKeyField + " + 1000")) + + firstDf.write.format("hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(tablePath) + + // Second batch with all updates + // Deltacommit2 (DC2, completed), updating fg1 + // .fg1_dc1.log (from DC2) written to storage + // For record key "0", the row is (0, 0, 2000) + val secondDf = spark.range(0, 10).toDF(recordKeyField) + .withColumn(precombineField, expr(recordKeyField)) + .withColumn(dataField, expr(recordKeyField + " + 2000")) + + secondDf.write.format("hudi") + .options(options) + .mode(SaveMode.Append).save(tablePath) + + val compactionOptions = options ++ Map( + HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key -> CompactionTriggerStrategy.NUM_COMMITS.name, + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1", + DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> "false", + HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName + ) + + // Schedule and execute compaction, leaving the compaction inflight + // Compaction3 (C3, inflight), compacting fg1 + // fg1_c3.parquet is written to storage + val client = DataSourceUtils.createHoodieClient( + spark.sparkContext, "", tablePath, tableName, + mapAsJavaMap(compactionOptions)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + + val compactionInstant = client.scheduleCompaction(org.apache.hudi.common.util.Option.empty()).get() + + // NOTE: this executes the compaction to write the compacted base files, and leaves the + // compaction instant still inflight, emulating a compaction action that is in progress + client.compact(compactionInstant) + client.close() + + // Third batch with all updates + // Deltacommit4 (DC4, completed), updating fg1 + // .fg1_c3.log (from DC4) is written to storage + // For record key "0", the row is (0, 0, 3000) + val thirdDf = spark.range(0, 10).toDF(recordKeyField) + .withColumn(precombineField, expr(recordKeyField)) + .withColumn(dataField, expr(recordKeyField + " + 3000")) + + thirdDf.write.format("hudi") + .options(options) + .mode(SaveMode.Append).save(tablePath) + + // Read-optimized query on MOR + val roDf = spark.read.format("org.apache.hudi") + .option( + DataSourceReadOptions.QUERY_TYPE.key, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(tablePath) + + // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only + assertEquals(10, roDf.count()) + assertEquals( + 1000L, + roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0)) + } }