Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Tips

What is the purpose of the pull request

Replacing UDF in Bulk Insert w/ RDD transformation.

Brief change log

TBD

Verify this pull request

This pull request is already covered by existing tests, such as (please describe tests).
This change added tests and can be verified as follows:

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@alexeykudinkin alexeykudinkin changed the title [HUDI-3993][Perf] Replacing UDF in Bulk Insert w/ RDD transformation [WIP][HUDI-3993][Perf] Replacing UDF in Bulk Insert w/ RDD transformation Apr 30, 2022
@yihua yihua added area:performance Performance optimizations writer-core labels May 3, 2022
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this toRdd incur any perf hit? if yes, can you do some benchmark w/ udfs based vs this and report what do you see. Alternatively you can also, run a benchmark w/ raw parquet write w/ bulk insert row writer non partitioned and no sort mode and ensure we see comparable nos w/ this patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am seeing some perf hit w/ this code change. will wait to sync up with Alexey on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toRdd is how Datasets are getting executed in Spark eventually. There's no perf hit by using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR should only be applied in conjunction w/ this one #5523

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm this issue has been resolved in Master branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think issue @nsivabalan is referring to is the fact that this PR shouldn't be measured in isolation but only together with #5523 (which is landed as well)

val updatedDF = HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)

if (!populateMetaFields) {
updatedDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably it was a gap before. but we may not have to support dropPartitionColumns even with virtual key code path. can we fix that please

}

private def dedupeRows(df: DataFrame, preCombineFieldRef: String, isGlobalIndex: Boolean): DataFrame = {
val recordKeyMetaFieldOrd = df.schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
Copy link
Contributor

@nsivabalan nsivabalan May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may also need to be fixed for virtual key path, or we can call it out that its not supported for now. even prior to this patch, we did not have support for de-duping in virtual key flow in row writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, virtual keys de-duping isn't supported currently

@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(InternalRow internalRow, StructType schema) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the changes in my other patch, we don't need to deserialize to Row to fetch the value. Can you take a look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are temporary changes, that are addressed in #5523

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are resolution on this? Did you end up backing out the temporary changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. These are revisited


@Test
def testGetNestedRowValue(): Unit = {
val schema = StructType(Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: if you intend to use the same schema for many tests, we can make this an instance variable and not declare in every test. its immutable and so we could even make static final.

// NOTE: Helper keeps track of [[lastKnownPartitionPath]] as [[UTF8String]] to avoid
// conversion from Catalyst internal representation into a [[String]]
partitionPath = row.getString(
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can directly use 3 here instead of looking up in hashmap

}

@Override
public String getRecordKey(Row row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we migrate this fix to SimpleKeyGen, if you feel existing impl in SimpleKeyGen could be fixed? why making changes just to NonPartitionedKeyGen only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is addressed in #5523

@alexeykudinkin alexeykudinkin force-pushed the ak/spk-blk-ins-opt branch 2 times, most recently from dba2edf to 9c7e7ea Compare May 5, 2022 02:41
@alexeykudinkin alexeykudinkin changed the title [WIP][HUDI-3993][Perf] Replacing UDF in Bulk Insert w/ RDD transformation [HUDI-3993][Perf][Stacked on 5497] Replacing UDF in Bulk Insert w/ RDD transformation May 7, 2022
@alexeykudinkin alexeykudinkin changed the title [HUDI-3993][Perf][Stacked on 5497] Replacing UDF in Bulk Insert w/ RDD transformation [HUDI-3993][Stacked on 5497] Replacing UDF in Bulk Insert w/ RDD transformation May 7, 2022
@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label May 10, 2022

String writeToken = getWriteToken(taskPartitionId, taskId, taskEpochId);
String fileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId,
table.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.getBaseFileExtension()

@nsivabalan nsivabalan force-pushed the ak/spk-blk-ins-opt branch from 9c7e7ea to dc91261 Compare May 11, 2022 03:32
@codope codope added priority:blocker Production down; release blocker and removed priority:critical Production degraded; pipelines stalled labels Jul 9, 2022
@alexeykudinkin alexeykudinkin changed the title [HUDI-3993][Stacked on 5497] Replacing UDF in Bulk Insert w/ RDD transformation [HUDI-3993] Replacing UDF in Bulk Insert w/ RDD transformation Jul 14, 2022
@alexeykudinkin alexeykudinkin force-pushed the ak/spk-blk-ins-opt branch 2 times, most recently from 0600a70 to 6dfc22b Compare July 15, 2022 20:49
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few notable improvements and refactoring are from this PR. @alexeykudinkin have you benchmarked the performance from this set of changes?

};

this.row = row;
this.containsMetaFields = containsMetaFields;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if containsMetaFields is false, should the length of metaFields be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some confusion: containsMetaFields relates to whether inner row contains the meta-fields itself. However, HIR will always override the meta-fields by overlaying on top of whatever the source row contains (this is necessary b/c UnsafeRow can't be updated)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am gonna update the docs to make it more clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg.

private int rebaseOrdinal(int ordinal) {
// NOTE: In cases when source row does not contain meta fields, we will have to
// rebase ordinal onto its indexes
return containsMetaFields ? ordinal : ordinal - metaFields.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the source row does not contain meta fields (containsMetaFields is false), and assuming metaFields is empty, the logic here for adjusting the ordinal is not necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comments above -- we always overlay meta-fields, since we need them to be mutable (they're being updated dynamically in writer)

* @throws IOException on any exception while writing.
*/
void writeRow(String key, InternalRow row) throws IOException;
void writeRow(UTF8String key, InternalRow row) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the usage of UTF8String type for performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct -- to avoid conversion b/w String and UTF8String

Comment on lines +39 to +43
public HoodieTimer(boolean shouldStart) {
if (shouldStart) {
startTimer();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not obvious (timer = new HoodieTime(true)) compared to exsiting way (timer = new HoodieTimer().startTimer()). Should we revert the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old semantic is still preserved: it works as it have been, and just adds new way when you don't need to invoke startTimer explicitly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I'm saying that timer = new HoodieTimer().startTimer() looks obvious for starting the timer instead of looking into what the boolean represents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

} else {
minRecordKey = recordKey;
public void add(UTF8String recordKey) {
this.bloomFilter.add(recordKey.getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the content or the byte array of the String and UTF8String instances should be the same here, right? So that the bloom filter lookup is not affected based on the String key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bloom filter always ingest UTF8 (Java by default encodes in UTF16)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, want to make sure there is no gap between Spark UTF8String and UTF8 encoding in Java, since this is going to affect the Bloom Index.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BloomFilter add does . So we seem to be fine. it's good to trust-but-verify once though that recordKey.getBytes() is equal to string.getBytes(StandardCharsets.UTF_8). @alexeykudinkin you probably checked that during development?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Comment on lines +101 to +108
if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
this.simpleKeyGen = true;
this.simplePartitionFieldIndex = (Integer) structType.getFieldIndex(keyGeneratorOpt.get().getPartitionPathFields().get(0)).get();
this.simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
} else {
this.simpleKeyGen = false;
this.simplePartitionFieldIndex = -1;
this.simplePartitionFieldDataType = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is out-of-scope for this PR, but why do we need special-case handling for the simple key generator here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually do not (it was done for perf reasons before). This is addressed in #5523


import scala.collection.JavaConverters.asScalaBufferConverter

object HoodieDatasetBulkInsertHelper extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this refactored based on hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It's a simplified version converted into Scala (to handle RDDs)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand this better. Why did this need to be in scala?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid back-n-forth Java/Scala conversions

}
}

val metaFields = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use HoodieRecord.HOODIE_META_COLUMNS and transformation to form the meta fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. The reason i didn't do it in the first place was b/c order is critical here, and even though we're using a list, i didn't want this constraint to be instead obscured in other class (where order actually might not matter at all)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. But shouldn't all the places use the same order so that we can maintain the order in one place like HoodieRecord.HOODIE_META_COLUMNS to avoid discrepancy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Problem is that ordering only matters in a handful of contexts (compared to all usages of this list) and it harder to justify why ordering matters when you're looking at just the HoodieRecord class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, for this one we can keep it. Sth to think of for consistency to avoid bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should follow up and consolidate into one list in HoodieRecord. +1. Unless the other usages break with or . different ordering, I don't see any reason why we won't

}
// NOTE: It's critical whenever we keep the reference to the row, to make a copy
// since Spark might be providing us with a mutable copy (updated during the iteration)
(rowKey, row.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is row.copy() needed here for reduceByKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only can get away w/o copying when we do one-pass (streaming-like) processing. If at any point we need to hold a reference to it -- we will have to make a copy (it's gonna fail otherwise)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, later on, I think we need to revisit this pattern of copy() in the DAG to make sure they are needed. Could you create a ticket?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, what i'm saying is only applicable to InternalRow which don't copy by default and instead point into shared, mutable underlying buffer (actually holding what's been read)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exact scenarios cause Spark to fail without copy. Could you please expand on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exact code will fail if we remove copy, b/c often InternalRow is a mutable copy that Spark changes during iteration, which is safe when we access just the one under the pointer, but in the subsequent reduceByKey we access 2 rows at the same time

Alexey Kudinkin added 10 commits July 18, 2022 21:27
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

1 similar comment
@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

…unnecessary `DataFrame` > `RDD` conversions
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}
// NOTE: It's critical whenever we keep the reference to the row, to make a copy
// since Spark might be providing us with a mutable copy (updated during the iteration)
(rowKey, row.copy())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@apache apache deleted a comment from hudi-bot Jul 21, 2022
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin could you please respond!follow up on the minor comments.

this PR by itself seems to incur RDD conversion. Will look into #5523 as pointed out in review responses.

} else {
minRecordKey = recordKey;
public void add(UTF8String recordKey) {
this.bloomFilter.add(recordKey.getBytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BloomFilter add does . So we seem to be fine. it's good to trust-but-verify once though that recordKey.getBytes() is equal to string.getBytes(StandardCharsets.UTF_8). @alexeykudinkin you probably checked that during development?

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't really like scala imports in Java (becomes an issue - for us one day when we want to shrink scala spread in code). Any way we can avoid this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed in #5523

@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(InternalRow internalRow, StructType schema) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are resolution on this? Did you end up backing out the temporary changes


import scala.collection.JavaConverters.asScalaBufferConverter

object HoodieDatasetBulkInsertHelper extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand this better. Why did this need to be in scala?

}
}

val metaFields = Seq(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should follow up and consolidate into one list in HoodieRecord. +1. Unless the other usages break with or . different ordering, I don't see any reason why we won't

"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm this issue has been resolved in Master branch

}
// NOTE: It's critical whenever we keep the reference to the row, to make a copy
// since Spark might be providing us with a mutable copy (updated during the iteration)
(rowKey, row.copy())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exact scenarios cause Spark to fail without copy. Could you please expand on that?

@vinothchandar
Copy link
Member

I need to convince myself of the RDD conversion in place. So this is marked - "major concerns" until then

@vinothchandar
Copy link
Member

@alexeykudinkin Want to get my understanding straight, as well make sure we have an explanation for how these factors play out with the new changes.

  1. The original row writer impl originated in overhead from doing df.queryExecution.toRdd here, done before Avro record conversion. We traced this into a code in Spark, that makes an additional pass (almost) to materialize the Rows with a schema to be used by the iterator.

  2. I see that in 0.11.1 we were just processing the dataframe as DataSet<Row> and ergo the use of UDFs for the other functionality. This is what's been fixed in 0.12 now.

I want to understand how we are avoiding the RDD conversion costs, in the current approach? This cost becomes obvious when you do records with large number of columns (due to overhead per record)

@alexeykudinkin
Copy link
Contributor Author

alexeykudinkin commented Jul 25, 2022

@vinothchandar TL;DR is the difference b/w Row and InternalRow:

  • When you do df.rdd you invoke deserializer which will deserialize internal binary representation (UnsafeRow) into a Row holding Java native types (it also holds the schema)

  • df.queryExecution.toRdd is an internal API that returns you an RDD of InternalRows avoiding such conversion (that’s the primary reason for introduction of many utilities in HoodieUnsafeUtils to be able to access private Spark APIs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:performance Performance optimizations priority:blocker Production down; release blocker

Projects

Archived in project
Status: Done

Development

Successfully merging this pull request may close these issues.

6 participants