Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
0217715
curr
Jul 21, 2023
9325f13
fix select *
Jul 24, 2023
37d3b93
fix checkstyle
Jul 24, 2023
4e77337
relax mit
Jul 24, 2023
54a4e7e
add partition columns after merging
Jul 25, 2023
ee25b44
working
Jul 28, 2023
3a1eadb
going to generalize for all spark versions
Jul 28, 2023
67f298d
made more extensible
Jul 28, 2023
6f357c6
fix error in port
Jul 28, 2023
d28be3b
switch default to true
Jul 29, 2023
d7612ac
spark 3.2 working
Jul 29, 2023
bb2cd1b
added spark 3.4 support
Jul 29, 2023
9ea1398
support spark 3.1
Jul 29, 2023
7b7d90e
fix spark 3.2 and 3.3 after changes
Jul 29, 2023
a6f97ed
spark 3.0 working
Jul 29, 2023
a52dacd
spark 2.4 working
Jul 30, 2023
0e91a54
add imports to spark 3 adapter
Jul 30, 2023
bb0acc5
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Jul 30, 2023
3e2626a
fix merge
Jul 30, 2023
c05f009
disable for schema on read
Jul 30, 2023
662f3b3
disable with inmemory index
Jul 30, 2023
72c0bb1
disable with timestamp keygenerator
Jul 30, 2023
793964b
fix pruning timestamp keygen
Jul 30, 2023
646edf5
fix partition filter push down test
Jul 30, 2023
663aa88
check glob paths for null
Jul 30, 2023
3d6f947
add isProjectionCompatible
Jul 30, 2023
4e33648
optimize skip merge
Jul 30, 2023
26bb36c
fix testReadLogOnlyMergeOnReadTable
Jul 30, 2023
b8f1f89
fix test
Jul 30, 2023
2089508
clean up
Jul 31, 2023
eb91c86
combine for spark versions
Aug 1, 2023
d6025b9
address some comments
Aug 1, 2023
87e8f76
flag changed meaning but didn't fix in default source
Aug 2, 2023
54bb07b
clean up a bit
Aug 2, 2023
b695af3
addressed all comments
Aug 2, 2023
b54a365
disable some tests and checkstyle
Aug 2, 2023
f179c08
re enable one test
Aug 2, 2023
293ae46
had to disable for skip merge as well
Aug 2, 2023
6ce7ff6
clean up to pass CI
Aug 3, 2023
af76828
fix checkstyle
Aug 3, 2023
1875a19
fix test failing issue
Aug 3, 2023
ef8eaad
address review feedback
Aug 3, 2023
89a4c7f
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Aug 4, 2023
def394b
address pr comments
Aug 4, 2023
f13bb9c
addressed review
Aug 5, 2023
e5a805e
Merge branch 'master' into mor_perf_spark33
yihua Aug 6, 2023
65cfcdf
Update docs of LegacyHoodieParquetFileFormat
yihua Aug 6, 2023
c458337
Rename the config
yihua Aug 6, 2023
44a63c8
Fix imports
yihua Aug 6, 2023
fa681fd
Rename config to be accurate
yihua Aug 6, 2023
83f6b8b
Fix build
yihua Aug 6, 2023
996c798
Merge remote-tracking branch 'origin/master' into mor_perf_spark33
Aug 6, 2023
69aa9e6
Merge remote-tracking branch 'origin/mor_perf_spark33' into mor_perf_…
Aug 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand Down Expand Up @@ -77,6 +78,15 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)]


/**
* Spark requires file formats to append the partition path fields to the end of the schema.
* For tables where the partition path fields are not at the end of the schema, we don't want
* to return the schema in the wrong order when they do a query like "select *". To fix this
* behavior, we apply a projection onto FileScan when the file format is NewHudiParquetFileFormat
*/
def applyNewHoodieParquetFileFormatProjection(plan: LogicalPlan): LogicalPlan

/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
* changes in Spark 3.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.storage.StorageLevel

import java.util.{Locale, TimeZone}
Expand Down Expand Up @@ -165,7 +165,9 @@ trait SparkAdapter extends Serializable {
/**
* Create instance of [[ParquetFileFormat]]
*/
def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]
def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]

def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): ColumnarBatch

/**
* Create instance of [[InterpretedPredicate]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@


org.apache.hudi.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When switching to the new file format with the config, should the NewHoodieParquetFileFormat be registered too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I'm not sure. What benefit does it give us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I don't have a clear answer. Since createRelation is overridden so functionality-wise it's ok.

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")

val USE_NEW_HUDI_PARQUET_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.use.new.parquet.file.format")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Read using the new Hudi parquet file format. The new Hudi parquet file format is " +
"introduced as an experimental feature in 0.14.0. Currently, the new Hudi parquet file format only applies " +
"to bootstrap and MOR queries. Schema evolution is also not supported by the new file format.")

val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@ object DefaultSource {
} else if (isCdcQuery) {
CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
} else {
lazy val newHudiFileFormatUtils = if (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty)
&& parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue())
.equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any issue with REALTIME_SKIP_MERGE_OPT_VAL merge type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I wasn't able to get it to work correctly before the code freeze

val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext, metaClient, parameters, userSchema)
if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
} else {
Option.empty
}

(tableType, queryType, isBootstrappedTable) match {
case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
(COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
Expand All @@ -256,16 +266,28 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
if (newHudiFileFormatUtils.isEmpty) {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
} else {
newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false)
}

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
if (newHudiFileFormatUtils.isEmpty) {
new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
} else {
newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = true)
}

case (_, _, true) =>
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
if (newHudiFileFormatUtils.isEmpty) {
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
} else {
newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap = true)
}

case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.parquet.{LegacyHoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -241,8 +241,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
val parquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, LegacyHoodieParquetFileFormat.FILE_FORMAT_ID)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
options: Map[String, String],
hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val parquetFileFormat: ParquetFileFormat = sparkAdapter.createLegacyHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
dataSchema = dataSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hudi

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, convertFilterForTimestampKeyGenerator, getConfigProperties}
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
Expand Down Expand Up @@ -100,6 +100,8 @@ case class HoodieFileIndex(spark: SparkSession,

override def rootPaths: Seq[Path] = getQueryPaths.asScala

var shouldBroadcast: Boolean = false

/**
* Returns the FileStatus for all the base files (excluding log files). This should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic
Expand Down Expand Up @@ -142,26 +144,49 @@ case class HoodieFileIndex(spark: SparkSession,
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map {
case (partitionOpt, fileSlices) =>
val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
val baseFileStatusOpt = getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
val logFilesStatus = if (includeLogFiles) {
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, FileStatus](lf => lf.getFileStatus))
if (shouldBroadcast) {
val baseFileStatusesAndLogFileOnly: Seq[FileStatus] = fileSlices.map(slice => {
if (slice.getBaseFile.isPresent) {
slice.getBaseFile.get().getFileStatus
} else if (slice.getLogFiles.findAny().isPresent) {
slice.getLogFiles.findAny().get().getFileStatus
} else {
null
}
}).filter(slice => slice != null)
val c = fileSlices.filter(f => f.getLogFiles.findAny().isPresent
|| (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
if (c.nonEmpty) {
PartitionDirectory(new PartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), spark.sparkContext.broadcast(c)), baseFileStatusesAndLogFileOnly)
} else {
java.util.stream.Stream.empty()
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly)
}
val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
baseFileStatusOpt.foreach(f => files.append(f))
files
})

PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
} else {
val allCandidateFiles: Seq[FileStatus] = fileSlices.flatMap(fs => {
val baseFileStatusOpt = getBaseFileStatus(Option.apply(fs.getBaseFile.orElse(null)))
val logFilesStatus = if (includeLogFiles) {
fs.getLogFiles.map[FileStatus](JFunction.toJavaFunction[HoodieLogFile, FileStatus](lf => lf.getFileStatus))
} else {
java.util.stream.Stream.empty()
}
val files = logFilesStatus.collect(Collectors.toList[FileStatus]).asScala
baseFileStatusOpt.foreach(f => files.append(f))
files
})
PartitionDirectory(InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
}
}

hasPushedDownPartitionPredicates = true

if (shouldReadAsPartitionedTable()) {
prunedPartitionsAndFilteredFileSlices
} else {
} else if (shouldBroadcast) {
assert(partitionSchema.isEmpty)
prunedPartitionsAndFilteredFileSlices
}else {
Seq(PartitionDirectory(InternalRow.empty, prunedPartitionsAndFilteredFileSlices.flatMap(_.files)))
}
}
Expand Down Expand Up @@ -244,7 +269,11 @@ case class HoodieFileIndex(spark: SparkSession,
// Prune the partition path by the partition filters
// NOTE: Non-partitioned tables are assumed to consist from a single partition
// encompassing the whole table
val prunedPartitions = listMatchingPartitionPaths (partitionFilters)
val prunedPartitions = if (shouldBroadcast) {
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, partitionFilters))
} else {
listMatchingPartitionPaths(partitionFilters)
}
getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map(
{ case (partition, fileSlices) => (Option.apply(partition), fileSlices.asScala) })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.table.HoodieSparkTable
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}
Expand Down Expand Up @@ -206,7 +206,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.ORC => "orc"
}

Expand Down
Loading