Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/demo/sparksql-batch1.commands
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from s
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)

// Bootstrapped Copy-On-Write table
spark.sql("set hoodie.bootstrap.data.queries.only=false")
spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)

Expand Down
3 changes: 2 additions & 1 deletion docker/demo/sparksql-batch2.commands
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from s
spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)

// Copy-On-Write Bootstrapped table
// Copy-On-Write Bootstrapped table
spark.sql("set hoodie.bootstrap.data.queries.only=false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any integration test for bootstrap where we test with this feature on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it so now it will use the feature in this test on the queries that don't use the meta fields

spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Selects the mode in which each file/partition in the bootstrapped dataset gets bootstrapped");

public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
.key("hoodie.bootstrap.data.queries.only")
.defaultValue("true")
.withDocumentation("Improves query performance, but queries cannot use hudi metadata fields");

public static final ConfigProperty<String> FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.full.input.provider")
.defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
Expand Down Expand Up @@ -100,7 +101,7 @@ class DefaultSource extends RelationProvider
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we can't set read configs in spark sql using the syntax like "set hoodie.bootstrap.data.queries.only=false". It only works for write configs. This was something we wanted to add anyways: https://issues.apache.org/jira/browse/HUDI-5361


// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
Expand Down Expand Up @@ -261,7 +262,7 @@ object DefaultSource {
new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)

case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)

case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
Expand All @@ -270,6 +271,21 @@ object DefaultSource {
}
}

private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): BaseRelation = {
val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, sqlContext.sparkSession.sessionState.conf,
ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
if (!enableFileIndex || globPaths.nonEmpty || parameters.getOrElse(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true") != "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do away with the config and rely on the condition here to decide whether or not to use the fast read path (which should be done by default). Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to read the metadata columns you need to disable it. I found a few tests that use the metadata columns and I would assume that some users must

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get it. But, does it need to be inferred through a separate config? Can we not infer from the already available parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know at the point of creating the relation, so I don't think this can be done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonvex : Wouldn't this change cause user queries which includes hoodie metadata columns to fail ? Can't we just userschema being passed here to determine if there are any hoodie metadata columns being queried to determine appropriate next steps ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, @jonvex : if you look at HoodieBootstrapRelation.composeRDD (the relation is being instantiated in below line), we segregate the skeleton schema and base file schema. Can we move the optimization logic inside that ? My main concern is this would break the existing functionality of bootstrap queries including hudi metafields failing unless user turn off the feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark applies special optimizations to HadoopFsRelation so unless we contribute PRs to spark, this is the only way to do it as far as I can tell

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what optimization are being done to HadoopFsRelation that causes 100% speed up ? I don't seem to find this information from the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/HUDI-3896 I am not sure if this is the only optimization, but it is one of them. The query plans for non bootstrapped and bootstrap tables look pretty much identical except non bootstrap says "FileScan parquet" when reading and bootstrap reading says "scan HoodieBootstrapRelation"

I started by comparing time to run tpcds queries on boostrapped tables vs non bootstrapped. For a full bootstrap, the runtime ratio was 1.997 and for a metadata only bootstrap it was 1.638.

I thought that was surprising that the full bootstrap was so slow, so I tried to replicate what was being done in BaseFileOnlyRelation in the first commit in this pr. We create a HoodieFileScanRDD instead of a HoodieBootstrapRDD. The ratio of tpcds runtime compared to reading from a non bootstrap table was 1.48 for a full bootstrap table, and 1.35 for a metadata only bootstrap.

With the changes in this pr to leverage HadoopFsRelation the ratio was 1.12 for metadata only bootstrap, and 1.09 for full bootstrap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonvex : Can we make HoodieBootstrapRelation/HoodieBaseRelation extend HadoopFsRelation to get the behavior ?

HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
} else {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters +
(HoodieBootstrapRelation.USE_FAST_BOOTSTRAP_READ -> "true")).toHadoopFsRelation
}
}

private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
val shouldUseBootstrapFastRead = optParams.getOrElse(HoodieBootstrapRelation.USE_FAST_BOOTSTRAP_READ, "false") == "true"

shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -187,11 +187,23 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,

override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation =
this.copy(prunedDataSchema = Some(prunedSchema))

def toHadoopFsRelation: HadoopFsRelation = {
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
fileFormat = fileFormat,
optParams)(sparkSession)
}
}


object HoodieBootstrapRelation {

val USE_FAST_BOOTSTRAP_READ = "hoodie.bootstrap.relation.use.fast.bootstrap.read"

private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = {
val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq
val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
Expand Down Expand Up @@ -146,11 +147,10 @@ case class HoodieFileIndex(spark: SparkSession,
val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
val listedPartitions = getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map {
case (partition, fileSlices) =>
val baseFileStatuses: Seq[FileStatus] =
fileSlices.asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
.asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null))

// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
Expand Down Expand Up @@ -180,6 +180,22 @@ case class HoodieFileIndex(spark: SparkSession,
}
}

/**
* In the fast bootstrap read code path, it gets the file status for the bootstrap base files instead of
* skeleton files.
*/
private def getBaseFileStatus(baseFiles: mutable.Buffer[HoodieBaseFile]): mutable.Buffer[FileStatus] = {
if (shouldFastBootstrap) {
return baseFiles.map(f =>
if (f.getBootstrapBaseFile.isPresent) {
f.getBootstrapBaseFile.get().getFileStatus
} else {
f.getFileStatus
})
}
baseFiles.map(_.getFileStatus)
}

private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hudi.SparkHoodieTableFileIndex._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.model.{FileSlice, HoodieRecord, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.hadoop.CachingPath
Expand Down Expand Up @@ -83,10 +83,18 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the schema of the table.
*/
lazy val schema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
lazy val schema: StructType = if (shouldFastBootstrap) {
StructType(rawSchema.fields.filterNot(f => HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)))
} else {
rawSchema
}

private lazy val rawSchema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})

protected lazy val shouldFastBootstrap = configProperties.getBoolean(HoodieBootstrapRelation.USE_FAST_BOOTSTRAP_READ, false)

private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,9 @@ class TestHoodieSparkSqlWriter {
.option("hoodie.insert.shuffle.parallelism", "4")
.mode(SaveMode.Append).save(tempBasePath)

val currentCommits = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val currentCommits = spark.read.format("hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key, "false")
.load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNum = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
Expand All @@ -826,7 +828,9 @@ class TestHoodieSparkSqlWriter {
df.write.format("hudi").options(options)
.option(DataSourceWriteOptions.OPERATION.key, "insert_overwrite_table")
.option("hoodie.insert.shuffle.parallelism", "4").mode(SaveMode.Append).save(tempBasePath)
val currentCommitsBootstrap = spark.read.format("hudi").load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val currentCommitsBootstrap = spark.read.format("hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key, "false")
.load(tempBasePath).select("_hoodie_commit_time").take(1).map(_.getString(0))
val incrementalKeyIdNumBootstrap = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "0000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -661,7 +662,8 @@ public void testBulkInsertsAndUpsertsWithBootstrap(HoodieRecordType recordType)
cfg.configs.add("hoodie.bootstrap.parallelism=5");
cfg.targetBasePath = newDatasetBasePath;
new HoodieDeltaStreamer(cfg, jsc).sync();
Dataset<Row> res = sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
Dataset<Row> res = sqlContext.read().format("org.apache.hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "false").load(newDatasetBasePath);
LOG.info("Schema :");
res.printSchema();

Expand Down