diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index bf540a302e716..edfecec515d61 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -18,6 +18,17 @@ package org.apache.hudi.avro; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; + import org.apache.avro.AvroRuntimeException; import org.apache.avro.Conversions; import org.apache.avro.Conversions.DecimalConversion; @@ -42,16 +53,6 @@ import org.apache.avro.io.JsonDecoder; import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hudi.common.config.SerializableSchema; -import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.SchemaCompatibilityException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -470,6 +471,17 @@ public static Schema generateProjectionSchema(Schema originalSchema, List HoodieAvroUtils.getRootLevelFieldName(col)) protected def timeline: HoodieTimeline = // NOTE: We're including compaction here since it's not considering a "commit" operation @@ -246,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM // PROJECTION - val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema) @@ -362,8 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, !SubqueryExpression.hasSubquery(condition) } - protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = { + // For a nested field in mandatory columns, we should first get the root-level field, and then + // check for any missing column, as the requestedColumns should only contain root-level fields + // We should only append root-level field as well + val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField)) requestedColumns ++ missing } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 6aa7007851d2c..806a5e371df55 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } - override lazy val mandatoryColumns: Seq[String] = { + override lazy val mandatoryFields: Seq[String] = { // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in // cases when no columns are requested to be fetched (for ex, when using {@code count()} API) Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index a88eb63036db5..75bc96624e7b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, override type FileSplit = HoodieMergeOnReadFileSplit - override lazy val mandatoryColumns: Seq[String] = + override lazy val mandatoryFields: Seq[String] = Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 18b639f2f9bd2..8cf6b4174c9f2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} @@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.CsvSource import scala.collection.JavaConversions._ @@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val updatedVerificationVal: String = "driver_update" @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMergeOnReadStorage(isMetadataEnabled: Boolean) { - val dataGen = new HoodieTestDataGenerator() + @CsvSource(Array( + "true,", + "true,fare.currency", + "false,", + "false,fare.currency" + )) + def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) { + var options: Map[String, String] = commonOpts + + (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) + if (!StringUtils.isNullOrEmpty(preComineField)) { + options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField) + } + val dataGen = new HoodieTestDataGenerator(0xDEEF) val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) // Bulk Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") - .options(commonOpts) + .options(options) .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) @@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) @@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) inputDF3.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .options(options) .mode(SaveMode.Append) .save(basePath) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index f670450c3eeb8..945d26be3f464 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional import org.apache.avro.Schema import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} @@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with logWarning(s"Not matching bytes read ($bytesRead)") } - val readColumns = targetColumns ++ relation.mandatoryColumns + val readColumns = targetColumns ++ relation.mandatoryFields val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) val row: InternalRow = rows.take(1).head