@@ -20,19 +20,21 @@ package org.apache.hudi.functional
2020import org .apache .hadoop .fs .Path
2121import org .apache .hudi .DataSourceWriteOptions ._
2222import org .apache .hudi .HoodieConversionUtils .toJavaOption
23+ import org .apache .hudi .client .SparkRDDWriteClient
2324import org .apache .hudi .common .config .HoodieMetadataConfig
24- import org .apache .hudi .common .model .{DefaultHoodieRecordPayload , HoodieTableType }
25+ import org .apache .hudi .common .model .{DefaultHoodieRecordPayload , HoodieRecordPayload , HoodieTableType , OverwriteWithLatestAvroPayload }
2526import org .apache .hudi .common .table .HoodieTableMetaClient
2627import org .apache .hudi .common .testutils .HoodieTestDataGenerator
2728import org .apache .hudi .common .testutils .RawTripTestPayload .recordsToStrings
2829import org .apache .hudi .common .util
29- import org .apache .hudi .config .{HoodieIndexConfig , HoodieWriteConfig }
30+ import org .apache .hudi .config .{HoodieCompactionConfig , HoodieIndexConfig , HoodieWriteConfig }
3031import org .apache .hudi .index .HoodieIndex .IndexType
3132import org .apache .hudi .keygen .NonpartitionedKeyGenerator
3233import org .apache .hudi .keygen .constant .KeyGeneratorOptions .Config
34+ import org .apache .hudi .table .action .compact .CompactionTriggerStrategy
3335import org .apache .hudi .testutils .{DataSourceTestUtils , HoodieClientTestBase }
3436import org .apache .hudi .util .JFunction
35- import org .apache .hudi .{DataSourceReadOptions , DataSourceWriteOptions , HoodieDataSourceHelpers , SparkDatasetMixin }
37+ import org .apache .hudi .{DataSourceReadOptions , DataSourceUtils , DataSourceWriteOptions , HoodieDataSourceHelpers , SparkDatasetMixin }
3638import org .apache .log4j .LogManager
3739import org .apache .spark .sql ._
3840import org .apache .spark .sql .functions ._
@@ -44,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest
4446import org .junit .jupiter .params .provider .CsvSource
4547
4648import java .util .function .Consumer
49+ import scala .collection .JavaConversions .mapAsJavaMap
4750import scala .collection .JavaConverters ._
4851
4952/**
@@ -978,4 +981,114 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
978981 assertEquals(incrementalQueryRes.where(" partition = '2022-01-01'" ).count, 0 )
979982 assertEquals(incrementalQueryRes.where(" partition = '2022-01-02'" ).count, 20 )
980983 }
984+
985+ /**
986+ * Test read-optimized query on MOR during inflight compaction.
987+ *
988+ * The following scenario is tested:
989+ * Hudi timeline:
990+ * > Deltacommit1 (DC1, completed), writing file group 1 (fg1)
991+ * > Deltacommit2 (DC2, completed), updating fg1
992+ * > Compaction3 (C3, inflight), compacting fg1
993+ * > Deltacommit4 (DC4, completed), updating fg1
994+ *
995+ * On storage, these are the data files for fg1:
996+ * file slice v1:
997+ * - fg1_dc1.parquet (from DC1)
998+ * - .fg1_dc1.log (from DC2)
999+ * file slice v2:
1000+ * - fg1_c3.parquet (from C3, inflight)
1001+ * - .fg1_c3.log (from DC4)
1002+ *
1003+ * The read-optimized query should read `fg1_dc1.parquet` only in this case.
1004+ */
1005+ @ Test
1006+ def testReadOptimizedQueryAfterInflightCompactionAndCompletedDeltaCommit (): Unit = {
1007+ val (tableName, tablePath) = (" hoodie_mor_ro_read_test_table" , s " ${basePath}_mor_test_table " )
1008+ val precombineField = " col3"
1009+ val recordKeyField = " key"
1010+ val dataField = " age"
1011+
1012+ val options = Map [String , String ](
1013+ DataSourceWriteOptions .TABLE_TYPE .key -> HoodieTableType .MERGE_ON_READ .name,
1014+ DataSourceWriteOptions .OPERATION .key -> UPSERT_OPERATION_OPT_VAL ,
1015+ DataSourceWriteOptions .PRECOMBINE_FIELD .key -> precombineField,
1016+ DataSourceWriteOptions .RECORDKEY_FIELD .key -> recordKeyField,
1017+ DataSourceWriteOptions .PARTITIONPATH_FIELD .key -> " " ,
1018+ DataSourceWriteOptions .KEYGENERATOR_CLASS_NAME .key -> " org.apache.hudi.keygen.NonpartitionedKeyGenerator" ,
1019+ HoodieWriteConfig .TBL_NAME .key -> tableName,
1020+ " hoodie.insert.shuffle.parallelism" -> " 1" ,
1021+ " hoodie.upsert.shuffle.parallelism" -> " 1" )
1022+
1023+ // First batch with all inserts
1024+ // Deltacommit1 (DC1, completed), writing file group 1 (fg1)
1025+ // fg1_dc1.parquet written to storage
1026+ // For record key "0", the row is (0, 0, 1000)
1027+ val firstDf = spark.range(0 , 10 ).toDF(recordKeyField)
1028+ .withColumn(precombineField, expr(recordKeyField))
1029+ .withColumn(dataField, expr(recordKeyField + " + 1000" ))
1030+
1031+ firstDf.write.format(" hudi" )
1032+ .options(options)
1033+ .mode(SaveMode .Overwrite )
1034+ .save(tablePath)
1035+
1036+ // Second batch with all updates
1037+ // Deltacommit2 (DC2, completed), updating fg1
1038+ // .fg1_dc1.log (from DC2) written to storage
1039+ // For record key "0", the row is (0, 0, 2000)
1040+ val secondDf = spark.range(0 , 10 ).toDF(recordKeyField)
1041+ .withColumn(precombineField, expr(recordKeyField))
1042+ .withColumn(dataField, expr(recordKeyField + " + 2000" ))
1043+
1044+ secondDf.write.format(" hudi" )
1045+ .options(options)
1046+ .mode(SaveMode .Append ).save(tablePath)
1047+
1048+ val compactionOptions = options ++ Map (
1049+ HoodieCompactionConfig .INLINE_COMPACT_TRIGGER_STRATEGY .key -> CompactionTriggerStrategy .NUM_COMMITS .name,
1050+ HoodieCompactionConfig .INLINE_COMPACT_NUM_DELTA_COMMITS .key -> " 1" ,
1051+ DataSourceWriteOptions .ASYNC_COMPACT_ENABLE .key -> " false" ,
1052+ HoodieWriteConfig .WRITE_PAYLOAD_CLASS_NAME .key -> classOf [OverwriteWithLatestAvroPayload ].getName
1053+ )
1054+
1055+ // Schedule and execute compaction, leaving the compaction inflight
1056+ // Compaction3 (C3, inflight), compacting fg1
1057+ // fg1_c3.parquet is written to storage
1058+ val client = DataSourceUtils .createHoodieClient(
1059+ spark.sparkContext, " " , tablePath, tableName,
1060+ mapAsJavaMap(compactionOptions)).asInstanceOf [SparkRDDWriteClient [HoodieRecordPayload [Nothing ]]]
1061+
1062+ val compactionInstant = client.scheduleCompaction(org.apache.hudi.common.util.Option .empty()).get()
1063+
1064+ // NOTE: this executes the compaction to write the compacted base files, and leaves the
1065+ // compaction instant still inflight, emulating a compaction action that is in progress
1066+ client.compact(compactionInstant)
1067+ client.close()
1068+
1069+ // Third batch with all updates
1070+ // Deltacommit4 (DC4, completed), updating fg1
1071+ // .fg1_c3.log (from DC4) is written to storage
1072+ // For record key "0", the row is (0, 0, 3000)
1073+ val thirdDf = spark.range(0 , 10 ).toDF(recordKeyField)
1074+ .withColumn(precombineField, expr(recordKeyField))
1075+ .withColumn(dataField, expr(recordKeyField + " + 3000" ))
1076+
1077+ thirdDf.write.format(" hudi" )
1078+ .options(options)
1079+ .mode(SaveMode .Append ).save(tablePath)
1080+
1081+ // Read-optimized query on MOR
1082+ val roDf = spark.read.format(" org.apache.hudi" )
1083+ .option(
1084+ DataSourceReadOptions .QUERY_TYPE .key,
1085+ DataSourceReadOptions .QUERY_TYPE_READ_OPTIMIZED_OPT_VAL )
1086+ .load(tablePath)
1087+
1088+ // The base file in the first file slice, i.e., fg1_dc1.parquet, should be read only
1089+ assertEquals(10 , roDf.count())
1090+ assertEquals(
1091+ 1000L ,
1092+ roDf.where(col(recordKeyField) === 0 ).select(dataField).collect()(0 ).getLong(0 ))
1093+ }
9811094}
0 commit comments