Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -40,6 +38,9 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -291,31 +292,23 @@ private void doRefresh() {

validate(activeTimeline, queryInstant);

if (tableType.equals(HoodieTableType.MERGE_ON_READ) && queryType.equals(HoodieTableQueryType.SNAPSHOT)) {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
// NOTE: For MOR table, when the compaction is inflight, we need to not only fetch the
// latest slices, but also include the base and log files of the second-last version of
// the file slice in the same file group as the latest file slice that is under compaction.
// This logic is realized by `AbstractTableFileSystemView::getLatestMergedFileSlicesBeforeOrOn`
// API. Note that for COW table, the merging logic of two slices does not happen as there
// is no compaction, thus there is no performance impact.
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionPath.path, queryInstant.get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add docs around how this single api would suffice for all 3 callers (COW, MOR Ro, MOR RT).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the docs above. Is that sufficient?

)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
.orElse(Collections.emptyList())
)
);
} else {
cachedAllInputFileSlices = partitionFiles.keySet().stream()
.collect(Collectors.toMap(
Function.identity(),
partitionPath ->
queryInstant.map(instant ->
fileSystemView.getLatestFileSlicesBeforeOrOn(partitionPath.path, instant, true)
)
.orElse(fileSystemView.getLatestFileSlices(partitionPath.path))
.collect(Collectors.toList())
)
);
}
)
);

cachedFileSize = cachedAllInputFileSlices.values().stream()
.flatMap(Collection::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.hudi.{DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.log4j.LogManager
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
Expand All @@ -44,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import java.util.function.Consumer
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -978,4 +981,114 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count, 0)
assertEquals(incrementalQueryRes.where("partition = '2022-01-02'").count, 20)
}

/**
* Test read-optimized query on MOR during inflight compaction.
*
* The following scenario is tested:
* Hudi timeline:
* > Deltacommit1 (DC1, completed), writing file group 1 (fg1)
* > Deltacommit2 (DC2, completed), updating fg1
* > Compaction3 (C3, inflight), compacting fg1
* > Deltacommit4 (DC4, completed), updating fg1
*
* On storage, these are the data files for fg1:
* file slice v1:
* - fg1_dc1.parquet (from DC1)
* - .fg1_dc1.log (from DC2)
* file slice v2:
* - fg1_c3.parquet (from C3, inflight)
* - .fg1_c3.log (from DC4)
*
* The read-optimized query should read `fg1_dc1.parquet` only in this case.
*/
@Test
def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit(): Unit = {
val (tableName, tablePath) = ("hoodie_mor_ro_read_test_table", s"${basePath}_mor_test_table")
val precombineField = "col3"
val recordKeyField = "key"
val dataField = "age"

val options = Map[String, String](
DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name,
DataSourceWriteOptions.OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> precombineField,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeyField,
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
HoodieWriteConfig.TBL_NAME.key -> tableName,
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.upsert.shuffle.parallelism" -> "1")

// First batch with all inserts
// Deltacommit1 (DC1, completed), writing file group 1 (fg1)
// fg1_dc1.parquet written to storage
// For record key "0", the row is (0, 0, 1000)
val firstDf = spark.range(0, 10).toDF(recordKeyField)
.withColumn(precombineField, expr(recordKeyField))
.withColumn(dataField, expr(recordKeyField + " + 1000"))

firstDf.write.format("hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(tablePath)

// Second batch with all updates
// Deltacommit2 (DC2, completed), updating fg1
// .fg1_dc1.log (from DC2) written to storage
// For record key "0", the row is (0, 0, 2000)
val secondDf = spark.range(0, 10).toDF(recordKeyField)
.withColumn(precombineField, expr(recordKeyField))
.withColumn(dataField, expr(recordKeyField + " + 2000"))

secondDf.write.format("hudi")
.options(options)
.mode(SaveMode.Append).save(tablePath)

val compactionOptions = options ++ Map(
HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key -> CompactionTriggerStrategy.NUM_COMMITS.name,
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1",
DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> "false",
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> classOf[OverwriteWithLatestAvroPayload].getName
)

// Schedule and execute compaction, leaving the compaction inflight
// Compaction3 (C3, inflight), compacting fg1
// fg1_c3.parquet is written to storage
val client = DataSourceUtils.createHoodieClient(
spark.sparkContext, "", tablePath, tableName,
mapAsJavaMap(compactionOptions)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

val compactionInstant = client.scheduleCompaction(org.apache.hudi.common.util.Option.empty()).get()

// NOTE: this executes the compaction to write the compacted base files, and leaves the
// compaction instant still inflight, emulating a compaction action that is in progress
client.compact(compactionInstant)
client.close()

// Third batch with all updates
// Deltacommit4 (DC4, completed), updating fg1
// .fg1_c3.log (from DC4) is written to storage
// For record key "0", the row is (0, 0, 3000)
val thirdDf = spark.range(0, 10).toDF(recordKeyField)
.withColumn(precombineField, expr(recordKeyField))
.withColumn(dataField, expr(recordKeyField + " + 3000"))

thirdDf.write.format("hudi")
.options(options)
.mode(SaveMode.Append).save(tablePath)

// Read-optimized query on MOR
val roDf = spark.read.format("org.apache.hudi")
.option(
DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(tablePath)

// The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only
assertEquals(10, roDf.count())
assertEquals(
1000L,
roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
}
}