Skip to content

Commit 9f5d165

Browse files
authored
[HUDI-4980] Calculate avg record size using commit only (#6864)
Calculate average record size for Spark upsert partitioner based on commit instants only. Previously it's based on commit and replacecommit, of which the latter may be created by clustering which has inaccurately smaller average record sizes, which could result in OOM due to size underestimation.
1 parent 48e5bb0 commit 9f5d165

1 file changed

Lines changed: 12 additions & 4 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,15 @@
2929
import org.apache.hudi.common.model.HoodieWriteStat;
3030
import org.apache.hudi.common.table.timeline.HoodieInstant;
3131
import org.apache.hudi.common.table.timeline.HoodieTimeline;
32+
import org.apache.hudi.common.util.CollectionUtils;
3233
import org.apache.hudi.common.util.NumericUtils;
3334
import org.apache.hudi.common.util.Option;
3435
import org.apache.hudi.common.util.collection.Pair;
3536
import org.apache.hudi.config.HoodieWriteConfig;
3637
import org.apache.hudi.table.HoodieTable;
3738
import org.apache.hudi.table.WorkloadProfile;
3839
import org.apache.hudi.table.WorkloadStat;
40+
3941
import org.apache.log4j.LogManager;
4042
import org.apache.log4j.Logger;
4143
import org.apache.spark.api.java.JavaRDD;
@@ -54,6 +56,8 @@
5456

5557
import scala.Tuple2;
5658

59+
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
60+
5761
/**
5862
* Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition).
5963
*/
@@ -158,13 +162,17 @@ private List<SmallFile> filterSmallFilesInClustering(final Set<String> pendingCl
158162
private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
159163
// for new inserts, compute buckets depending on how many records we have for each partition
160164
Set<String> partitionPaths = profile.getPartitionPaths();
161-
long averageRecordSize =
162-
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
163-
config);
165+
/*
166+
* NOTE: we only use commit instants to calculate average record size because replacecommit can be
167+
* created by clustering, which has smaller average record size, which affects assigning inserts and
168+
* may result in OOM by making spark underestimate the actual input record sizes.
169+
*/
170+
long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
171+
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config);
164172
LOG.info("AvgRecordSize => " + averageRecordSize);
165173

166174
Map<String, List<SmallFile>> partitionSmallFilesMap =
167-
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
175+
getSmallFilesForPartitions(new ArrayList<>(partitionPaths), context);
168176

169177
Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = getPartitionPathToPendingClusteringFileGroupsId();
170178

0 commit comments

Comments
 (0)