Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
0217715
curr
Jul 21, 2023
9325f13
fix select *
Jul 24, 2023
37d3b93
fix checkstyle
Jul 24, 2023
4e77337
relax mit
Jul 24, 2023
54a4e7e
add partition columns after merging
Jul 25, 2023
ee25b44
working
Jul 28, 2023
3a1eadb
going to generalize for all spark versions
Jul 28, 2023
67f298d
made more extensible
Jul 28, 2023
6f357c6
fix error in port
Jul 28, 2023
d28be3b
switch default to true
Jul 29, 2023
d7612ac
spark 3.2 working
Jul 29, 2023
bb2cd1b
added spark 3.4 support
Jul 29, 2023
9ea1398
support spark 3.1
Jul 29, 2023
7b7d90e
fix spark 3.2 and 3.3 after changes
Jul 29, 2023
a6f97ed
spark 3.0 working
Jul 29, 2023
a52dacd
spark 2.4 working
Jul 30, 2023
0e91a54
add imports to spark 3 adapter
Jul 30, 2023
bb0acc5
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Jul 30, 2023
3e2626a
fix merge
Jul 30, 2023
c05f009
disable for schema on read
Jul 30, 2023
662f3b3
disable with inmemory index
Jul 30, 2023
72c0bb1
disable with timestamp keygenerator
Jul 30, 2023
793964b
fix pruning timestamp keygen
Jul 30, 2023
646edf5
fix partition filter push down test
Jul 30, 2023
663aa88
check glob paths for null
Jul 30, 2023
3d6f947
add isProjectionCompatible
Jul 30, 2023
4e33648
optimize skip merge
Jul 30, 2023
26bb36c
fix testReadLogOnlyMergeOnReadTable
Jul 30, 2023
b8f1f89
fix test
Jul 30, 2023
2089508
clean up
Jul 31, 2023
eb91c86
combine for spark versions
Aug 1, 2023
d6025b9
address some comments
Aug 1, 2023
87e8f76
flag changed meaning but didn't fix in default source
Aug 2, 2023
54bb07b
clean up a bit
Aug 2, 2023
b695af3
addressed all comments
Aug 2, 2023
b54a365
disable some tests and checkstyle
Aug 2, 2023
f179c08
re enable one test
Aug 2, 2023
293ae46
had to disable for skip merge as well
Aug 2, 2023
6ce7ff6
clean up to pass CI
Aug 3, 2023
af76828
fix checkstyle
Aug 3, 2023
1875a19
fix test failing issue
Aug 3, 2023
ef8eaad
address review feedback
Aug 3, 2023
89a4c7f
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Aug 4, 2023
def394b
address pr comments
Aug 4, 2023
f13bb9c
addressed review
Aug 5, 2023
e5a805e
Merge branch 'master' into mor_perf_spark33
yihua Aug 6, 2023
65cfcdf
Update docs of LegacyHoodieParquetFileFormat
yihua Aug 6, 2023
c458337
Rename the config
yihua Aug 6, 2023
44a63c8
Fix imports
yihua Aug 6, 2023
fa681fd
Rename config to be accurate
yihua Aug 6, 2023
83f6b8b
Fix build
yihua Aug 6, 2023
996c798
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Aug 6, 2023
69aa9e6
Merge remote-tracking branch 'origin/mor_perf_spark33' into mor_perf_…
Aug 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Selects the mode in which each file/partition in the bootstrapped dataset gets bootstrapped");

public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
.key("hoodie.bootstrap.data.queries.only")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Improves query performance, but queries cannot use hudi metadata fields");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add elaboration on what this config controls? Currently, it's not apparent.

Copy link
Contributor Author

@jonvex jonvex Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this config when we were trying to find ways to speed up bootstrap reading . What it did was not do the skeleton base file stitching and don't return the meta fields. With this pr, it isn't necessary anymore, because now if you don't request any meta cols, that will automatically happen inside the new reader so I think it is just confusing to introduce this config and then have it not be necessary very soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Could you update the config documentation? As discussed, we'll keep this config since it can still be used when the existing file format and relations are used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, the config doc update should be in a separate PR.


public static final ConfigProperty<String> FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.full.input.provider")
.defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.spark.sql.types.StructType

case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)

case class HoodieTableState(tablePath: String,
latestCommitTimestamp: Option[String],
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String,
metadataConfig: HoodieMetadataConfig,
recordMergerImpls: List[String],
recordMergerStrategy: String)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand Down Expand Up @@ -77,6 +78,9 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)]


def applyMORBootstrapFileFormatProjection(plan: LogicalPlan): LogicalPlan

/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
* changes in Spark 3.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.spark.sql.hudi

import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{HoodieTableSchema, HoodieTableState}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
Expand All @@ -38,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.storage.StorageLevel

import java.util.{Locale, TimeZone}
Expand Down Expand Up @@ -166,6 +170,17 @@ trait SparkAdapter extends Serializable {
* Create instance of [[ParquetFileFormat]]
*/
def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
def createMORBootstrapFileFormat(appendPartitionValues: Boolean,
tableState: Broadcast[HoodieTableState],
tableSchema: Broadcast[HoodieTableSchema],
tableName: String,
mergeType: String,
mandatoryFields: Seq[String],
isMOR: Boolean,
isBootstrap: Boolean): Option[ParquetFileFormat]
def getFilePath(file: PartitionedFile): Path

def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch

/**
* Create instance of [[InterpretedPredicate]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")

val MOR_BOOTSTRAP_FILE_READER: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.mor.bootstrap.file.reader")
.defaultValue("true")
.markAdvanced()
.withDocumentation("read using the mor bootstrap parquet file reader")

val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.{WRITE_CONCURRENCY_MODE, SPARK_SQL_MERGE_INTO_PREPPED_KEY}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
Expand Down Expand Up @@ -102,8 +101,7 @@ class DefaultSource extends RelationProvider
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams +
(DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(), optParams.getOrElse(DATA_QUERIES_ONLY.key(), DATA_QUERIES_ONLY.defaultValue()))))
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
Expand Down Expand Up @@ -247,6 +245,9 @@ object DefaultSource {
Option(schema)
}

val useMORBootstrapFF = parameters.getOrElse(MOR_BOOTSTRAP_FILE_READER.key,
MOR_BOOTSTRAP_FILE_READER.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use the new file format with globbed paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be fixed, but I attempted it for a bit and it seemed like it would take .5-1 days. Since that uses in memory file index, I assume that glob paths only happens for small tables that wouldn't benefit much from this improvement anyways?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Then let's leave this as a follow-up. The new file format should support this too for feature completeness.


if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, Some(schema)))
} else if (isCdcQuery) {
Expand All @@ -262,16 +263,30 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
val relation = new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
relation.toHadoopFsRelation
} else {
relation
}

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)

val relation = new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
relation.toHadoopFsRelation
} else {
relation
}
case (_, _, true) =>
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
val relation = new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
relation.toHadoopFsRelation
} else {
relation
}

case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
Expand All @@ -280,24 +295,6 @@ object DefaultSource {
}
}

private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): BaseRelation = {
val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, sqlContext.sparkSession.sessionState.conf,
ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
|| globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
} else {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation
}
}

private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
Expand Down Expand Up @@ -70,17 +69,6 @@ import scala.util.{Failure, Success, Try}

trait HoodieFileSplit {}

case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None)

case class HoodieTableState(tablePath: String,
latestCommitTimestamp: Option[String],
recordKeyField: String,
preCombineFieldOpt: Option[String],
usesVirtualKeys: Boolean,
recordPayloadClassName: String,
metadataConfig: HoodieMetadataConfig,
recordMergerImpls: List[String],
recordMergerStrategy: String)

/**
* Hoodie BaseRelation which extends [[PrunedFilteredScan]]
Expand Down Expand Up @@ -226,9 +214,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
val shouldUseBootstrapFastRead = optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean

shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionedFile}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -58,10 +58,13 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
override val optParams: Map[String, String],
private val prunedDataSchema: Option[StructType] = None)
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient,
optParams, prunedDataSchema) {
optParams, prunedDataSchema) with SparkAdapterSupport {

override type Relation = HoodieBootstrapMORRelation

protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)

protected lazy val mandatoryFieldsForMerging: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())

Expand Down Expand Up @@ -108,4 +111,18 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,

override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapMORRelation =
this.copy(prunedDataSchema = Some(prunedSchema))

def toHadoopFsRelation: HadoopFsRelation = {
fileIndex.shouldBroadcast = true
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
fileFormat = sparkAdapter.createMORBootstrapFileFormat(shouldExtractPartitionValuesFromPartitionPath,
sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR = true, isBootstrap = true).get,
optParams)(sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val prunedDataSchema: Option[StructType] = None)
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, optParams, prunedDataSchema) {
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, optParams, prunedDataSchema) with SparkAdapterSupport {

override type Relation = HoodieBootstrapRelation

Expand All @@ -59,12 +59,16 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
this.copy(prunedDataSchema = Some(prunedSchema))

def toHadoopFsRelation: HadoopFsRelation = {
fileIndex.shouldBroadcast = true
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
fileFormat = fileFormat,
fileFormat = sparkAdapter.createMORBootstrapFileFormat(shouldExtractPartitionValuesFromPartitionPath,
sparkSession.sparkContext.broadcast(tableState),
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)),
metaClient.getTableConfig.getTableName, "", mandatoryFields, isMOR = false, isBootstrap = true).get,
optParams)(sparkSession)
}
}
Expand Down
Loading