diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java index 1de36a26a5f78..d88f0bb2e6f7a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java @@ -67,6 +67,13 @@ public class HoodieBootstrapConfig extends HoodieConfig { .sinceVersion("0.6.0") .withDocumentation("Selects the mode in which each file/partition in the bootstrapped dataset gets bootstrapped"); + public static final ConfigProperty DATA_QUERIES_ONLY = ConfigProperty + .key("hoodie.bootstrap.data.queries.only") + .defaultValue("false") + .markAdvanced() + .sinceVersion("0.14.0") + .withDocumentation("Improves query performance, but queries cannot use hudi metadata fields"); + public static final ConfigProperty FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty .key("hoodie.bootstrap.full.input.provider") .defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e013da5f3a1c8..e4d0b9a8daef5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2851,6 +2851,11 @@ public Builder withAllowMultiWriteOnSameInstant(boolean allow) { return this; } + public Builder withHiveStylePartitioningEnabled(boolean enabled) { + writeConfig.setValue(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE, String.valueOf(enabled)); + return this; + } + public Builder withExternalSchemaTrasformation(boolean enabled) { writeConfig.setValue(AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE, String.valueOf(enabled)); return this; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index e4be1a136e2c2..6655093fc8549 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ConfigUtils import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE import org.apache.hudi.exception.HoodieException import org.apache.hudi.util.PathUtils @@ -101,7 +102,8 @@ class DefaultSource extends RelationProvider ) } else { Map() - }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams) + }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams + + (DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(), optParams.getOrElse(DATA_QUERIES_ONLY.key(), DATA_QUERIES_ONLY.defaultValue())))) // Get the table base path val tablePath = if (globPaths.nonEmpty) { @@ -262,7 +264,7 @@ object DefaultSource { new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) case (_, _, true) => - new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) + resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters) case (_, _, _) => throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + @@ -271,6 +273,24 @@ object DefaultSource { } } + private def resolveHoodieBootstrapRelation(sqlContext: SQLContext, + globPaths: Seq[Path], + userSchema: Option[StructType], + metaClient: HoodieTableMetaClient, + parameters: Map[String, String]): BaseRelation = { + val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, sqlContext.sparkSession.sessionState.conf, + ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean + val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters, + sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + if (!enableFileIndex || isSchemaEvolutionEnabledOnRead + || globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) { + HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false")) + } else { + HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation + } + } + private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, globPaths: Seq[Path], userSchema: Option[StructType], diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index d75883358223c..de67504d73de7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.{ConfigUtils, StringUtils} +import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.CachingPath import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter @@ -225,7 +226,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val shouldExtractPartitionValueFromPath = optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean - shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath + val shouldUseBootstrapFastRead = optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean + + shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index 5e15c0d1d68c1..34db28505010b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionedFile} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType @@ -200,6 +200,16 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation = this.copy(prunedDataSchema = Some(prunedSchema)) + def toHadoopFsRelation: HadoopFsRelation = { + HadoopFsRelation( + location = fileIndex, + partitionSchema = fileIndex.partitionSchema, + dataSchema = fileIndex.dataSchema, + bucketSpec = None, + fileFormat = fileFormat, + optParams)(sparkSession) + } + //TODO: This should be unnecessary with spark 3.4 [SPARK-41970] private def encodePartitionPath(file: FileStatus): String = { val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(bootstrapBasePath) @@ -212,7 +222,6 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, object HoodieBootstrapRelation { - private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = { val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3131c81abee84..3767b65a8ce7b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferenc import org.apache.hudi.HoodieSparkConfUtils.getConfigValue import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT} import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException @@ -41,6 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String import java.text.SimpleDateFormat import javax.annotation.concurrent.NotThreadSafe import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -145,11 +147,10 @@ case class HoodieFileIndex(spark: SparkSession, val prunedPartitions = listMatchingPartitionPaths(partitionFilters) val listedPartitions = getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map { case (partition, fileSlices) => - val baseFileStatuses: Seq[FileStatus] = - fileSlices.asScala - .map(fs => fs.getBaseFile.orElse(null)) - .filter(_ != null) - .map(_.getFileStatus) + val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices + .asScala + .map(fs => fs.getBaseFile.orElse(null)) + .filter(_ != null)) // Filter in candidate files based on the col-stats index lookup val candidateFiles = baseFileStatuses.filter(fs => @@ -179,6 +180,23 @@ case class HoodieFileIndex(spark: SparkSession, } } + /** + * In the fast bootstrap read code path, it gets the file status for the bootstrap base files instead of + * skeleton files. + */ + private def getBaseFileStatus(baseFiles: mutable.Buffer[HoodieBaseFile]): mutable.Buffer[FileStatus] = { + if (shouldFastBootstrap) { + baseFiles.map(f => + if (f.getBootstrapBaseFile.isPresent) { + f.getBootstrapBaseFile.get().getFileStatus + } else { + f.getFileStatus + }) + } else { + baseFiles.map(_.getFileStatus) + } + } + private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet allBaseFileNames -- allIndexedFileNames diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index be9c8cdb6bb5d..c76af7b39ce83 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -23,11 +23,12 @@ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.SparkHoodieTableFileIndex._ import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.CachingPath import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import javax.annotation.concurrent.NotThreadSafe @@ -83,10 +84,18 @@ class SparkHoodieTableFileIndex(spark: SparkSession, /** * Get the schema of the table. */ - lazy val schema: StructType = schemaSpec.getOrElse({ - val schemaUtil = new TableSchemaResolver(metaClient) - AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) - }) + lazy val schema: StructType = if (shouldFastBootstrap) { + StructType(rawSchema.fields.filterNot(f => HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))) + } else { + rawSchema + } + + private lazy val rawSchema: StructType = schemaSpec.getOrElse({ + val schemaUtil = new TableSchemaResolver(metaClient) + AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + }) + + protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false) private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil @@ -110,7 +119,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, .map(column => nameFieldMap.apply(column)) if (partitionFields.size != partitionColumns.get().size) { - val isBootstrapTable = BootstrapIndex.getBootstrapIndex(metaClient).useIndex() + val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent if (isBootstrapTable) { // For bootstrapped tables its possible the schema does not contain partition field when source table // is hive style partitioned. In this case we would like to treat the table as non-partitioned diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java index 2cbbbfab591ef..6539fc0e54cfd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrapRead.java @@ -54,6 +54,7 @@ import java.util.stream.Stream; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -230,12 +231,16 @@ protected void compareTables() { } Dataset hudiDf = sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath); Dataset bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath); + Dataset fastBootstrapDf = sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), "true").load(bootstrapTargetPath); if (nPartitions == 0) { + compareDf(fastBootstrapDf.drop("city_to_state"), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path")); compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns)); return; } compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols)); + compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols)); compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); + compareDf(fastBootstrapDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols)); } protected void compareDf(Dataset df1, Dataset df2) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 81c9233eb3456..12974d133a84b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -583,10 +583,12 @@ class TestDataSourceForBootstrap { assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1) // Read bootstrapped table and verify count - val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF1 = spark.read.format("hudi") + .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*") assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList()) - val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath) + val hoodieROViewDFWithBasePath = spark.read.format("hudi") + .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath) assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDFWithBasePath)).collectAsList()) // Perform upsert @@ -606,7 +608,8 @@ class TestDataSourceForBootstrap { assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) // Read table after upsert and verify count - val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") + val hoodieROViewDF2 = spark.read.format("hudi") + .option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())