Skip to content

Conversation

@jonvex
Copy link
Contributor

@jonvex jonvex commented Jul 24, 2023

Change Logs

Merge log and skeleton files in the file reader

Failing tests with new file format:

testDataTypePromotions
testArrayOfStructsChangeColumnType
testArrayOfMapsChangeValueType
testArrayOfMapsStructChangeFieldType
testComplexOperationsOnTable
testPartitionFiltersPushDown - not a bug
testSchemaEvolutionForTableType
Test Call show_logfile_metadata Procedure
Test Call show_logfile_records Procedure
Test NestedSchemaPruning optimization unsuccessful
Test Call run_bootstrap Procedure with no-partitioned
Test nested field as primaryKey and preCombineField
Test Call run_clustering Procedure By Table
Test Call run_clustering Procedure By Path
Test Call run_clustering Procedure With Partition Pruning
Test Two Table's Union Join with time travel
Test Query Merge_On_Read Read_Optimized table - they use partitionpath as precombine
testBulkInsertsAndUpsertsWithBootstrap

Unimplemented features (incomplete list):
SkipMerge MOR
Glob paths
Schema Evolution

Impact

Improve read performance simplify integration

Risk level (write none, low medium or high below)

High
Lots of testing and lots more to do

Documentation Update

Release notes?

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@jonvex jonvex changed the title Mor perf spark33 [HUDI-6568] Hudi Spark Integration Redesign Jul 28, 2023
@jonvex
Copy link
Contributor Author

jonvex commented Jul 30, 2023

@hudi-bot run azure

lazy val newHudiFileFormatUtils = if (!parameters.getOrElse(LEGACY_HUDI_PARQUET_FILE_FORMAT.key,
LEGACY_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty)
&& parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue())
.equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any issue with REALTIME_SKIP_MERGE_OPT_VAL merge type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I wasn't able to get it to work correctly before the code freeze


org.apache.hudi.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When switching to the new file format with the config, should the NewHoodieParquetFileFormat be registered too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I'm not sure. What benefit does it give us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I don't have a clear answer. Since createRelation is overridden so functionality-wise it's ok.

Comment on lines 149 to 183
val prunedPartitions = if (shouldBroadcast) {
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, partitionFilters))
} else {
listMatchingPartitionPaths(partitionFilters)
}
val listedPartitions = getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map {
case (partition, fileSlices) =>
val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
var baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
.asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null))

.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null))
if (shouldBroadcast) {
baseFileStatuses = baseFileStatuses ++ fileSlices.asScala
.filter(f => f.getLogFiles.findAny().isPresent && !f.getBaseFile.isPresent)
.map(f => f.getLogFiles.findAny().get().getFileStatus)
}
// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
// NOTE: This predicate is true when {@code Option} is empty
candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName)))

totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
if (shouldBroadcast) {
val c = fileSlices.asScala.filter(f => f.getLogFiles.findAny().isPresent
|| (f.getBaseFile.isPresent && f.getBaseFile.get().getBootstrapBaseFile.isPresent)).
foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
if (c.nonEmpty) {
PartitionDirectory(new PartitionFileSliceMapping(InternalRow.fromSeq(partition.values), spark.sparkContext.broadcast(c)), candidateFiles)
} else {
PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
}
} else {
PartitionDirectory(InternalRow.fromSeq(partition.values), candidateFiles)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move the logic to a single if branch when broadcast is enabled, so it's easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that before but don't think it looks better. I did it again and you can take a look

@jonvex jonvex requested a review from yihua August 3, 2023 21:16

protected lazy val basePath: Path = metaClient.getBasePathV2

protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should some of the util methods also used by HoodieBaseRelation be extracted to an independent Util class for code reuse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every singe method here is from base relation. You said to not use the relation so I just copied over what I needed. It was much simpler before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. What I meant is, the new file format should not extend existing file format classes or use relation inside directly. Util methods can still be extracted to a common util class so that both NewHoodieParquetFileFormatUtils and HoodieBaseRelation can use this new common util class. If that takes time, we can punt it.

val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
file.partitionValues match {
case broadcast: PartitionFileSliceMapping =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the branching here can be further simplified based on the split or file group type without having to specify isMOR or isBootstrap: (1) base file only, (2) base file + log files, (3) log files only, (4) bootstrap skeleton file + original file, (5) bootstrap skeleton file + original file + log files. Then we may apply optimization like predicate push down per split type. We can improve this part in a follow-up, along with aligning logic in different query types (e.g., schema handling, partition path handling, etc.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are 2 things going on here:
the first is that you are seeing
isBootstrap && bootstrapFileOpt.isPresent and isMOR && logFiles.nonEmpty the isBootstrap and isMore are not necessary because if the second condition is true then the first has to be true.

The second thing, that I don't think should be changed is

                  (isMOR, logFiles.nonEmpty) match {
                    case (true, true) => buildMergeOnReadIterator(bootstrapIterator, logFiles, filePath.getParent,
                      bootstrapReaderOutput, requiredSchemaWithMandatory, outputSchema, partitionSchema, partitionValues,
                      broadcastedHadoopConf.value.value)
                    case (true, false) => appendPartitionAndProject(bootstrapIterator, bootstrapReaderOutput,
                      partitionSchema, outputSchema, partitionValues)
                    case (false, false) => bootstrapIterator
                    case (false, true) => throw new IllegalStateException("should not be log files if not mor table")

The reasoning for this is that
if it's mor and it has log files then we need to merge the log files, then append the partition path, then project away any mandatory fields for merging (recordkey and precombine) that aren't required

if it's mor and doesn't have log files, then we need to append the partitionpath

if it's not mor, then the partitionpath has already been appended by the reader itself, so we just return it.

The final edge case to bring up is if (requiredSchemaWithMandatory.isEmpty)
That means that it is a df.count() so we just use the base file reader

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. What I referred to is, we need to revisit all the complexity and see if there're opportunities to unify the differences among COW and MOR (may require changes to iterators, file readers, etc.).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COW I think would work right now, because we would never wrap the partition columns with PartitionFileSliceMapping in the FileIndex so it would just hit the iterator to read with the base file reader. I would probably just add an if at the readers are created to just return the base file reader

Comment on lines +149 to +150
override def createLegacyHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24LegacyHoodieParquetFileFormat(appendPartitionValues))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is used by BaseFileOnlyRelation for COW and MOR read-optimized queries. I assume the new file format can also be applied by COW and MOR read-optimized queries too. We should follow up here in a separate PR.

The goal is to get rid of Spark-version-specific file format classes and make the Hudi Spark integration easier to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We're going to have to figure out if there is a way to do schema evolution without porting code.

@jonvex jonvex requested a review from yihua August 4, 2023 21:35


class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
class LegacyHoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs here to link to the new file format implementation so that any changes to this format implementation should also reflect in the new file format class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changes to the relation and especially rdd classes are more likely to require changes in the new file format than changes to LegacyHoodieParquetFileFormat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my point is developers changing this class should be aware of the new file format and not miss changing the new file format causing inconsistency, if necessary.

* </ol>
*/
class Spark24HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
class Spark24LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for version-specific file format classes: add docs here to link to the new file format implementation so that any changes to this format implementation should also reflect in the new file format class?


org.apache.hudi.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
org.apache.spark.sql.execution.datasources.parquet.LegacyHoodieParquetFileFormat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I don't have a clear answer. Since createRelation is overridden so functionality-wise it's ok.


protected lazy val basePath: Path = metaClient.getBasePathV2

protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. What I meant is, the new file format should not extend existing file format classes or use relation inside directly. Util methods can still be extracted to a common util class so that both NewHoodieParquetFileFormatUtils and HoodieBaseRelation can use this new common util class. If that takes time, we can punt it.

Comment on lines 248 to 249
val useMORBootstrapFF = parameters.getOrElse(MOR_BOOTSTRAP_FILE_READER.key,
MOR_BOOTSTRAP_FILE_READER.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Then let's leave this as a follow-up. The new file format should support this too for feature completeness.

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. This is a great leap towards making Hudi Spark integration performant and simpler!

@yihua
Copy link
Contributor

yihua commented Aug 4, 2023

@jonvex let's track the follow-ups in JIRA.

@yihua
Copy link
Contributor

yihua commented Aug 4, 2023

@jonvex Could you also update the PR description with details of the approach? Before merging this PR, let's create a new PR based on this patch with hoodie.datasource.read.use.legacy.parquet.file.format=false and make sure tests pass on MOR and bootstrap queries.

@jonvex jonvex requested a review from yihua August 5, 2023 00:23
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@apache apache deleted a comment from hudi-bot Aug 7, 2023
@yihua yihua merged commit 23f657d into apache:master Aug 7, 2023
@yihua
Copy link
Contributor

yihua commented Aug 7, 2023

CI is green.
Screenshot 2023-08-06 at 21 05 22

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

engine:spark Spark integration priority:blocker Production down; release blocker release-0.14.0

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

2 participants