Skip to content

Commit 0148b0a

Browse files
authored
fix: Fixing record size estimation to reflect previous behavior (#14039)
Co-authored-by: Jonathan Vexler <=>
1 parent f56fa8a commit 0148b0a

File tree

3 files changed

+66
-86
lines changed

3 files changed

+66
-86
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,8 @@
3333
import org.slf4j.LoggerFactory;
3434

3535
import java.io.IOException;
36-
import java.io.Serializable;
36+
import java.util.Iterator;
3737
import java.util.Set;
38-
import java.util.stream.Stream;
3938

4039
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
4140
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
@@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig writeConfig) {
6665
@Override
6766
public long averageBytesPerRecord(HoodieTimeline commitTimeline, CommitMetadataSerDe commitMetadataSerDe) {
6867
int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
69-
final AverageRecordSizeStats averageRecordSizeStats = new AverageRecordSizeStats(hoodieWriteConfig);
68+
final long commitSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
69+
final long metadataSizeEstimate = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
7070
try {
7171
if (!commitTimeline.empty()) {
72-
// Go over the reverse ordered commits to get a more recent estimate of average record size.
73-
Stream<HoodieInstant> filteredInstants = commitTimeline.filterCompletedInstants()
72+
Iterator<HoodieInstant> instants = commitTimeline.filterCompletedInstants()
7473
.getReverseOrderedInstants()
7574
.filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
76-
.limit(maxCommits);
77-
filteredInstants
78-
.forEach(instant -> {
79-
HoodieCommitMetadata commitMetadata;
80-
try {
81-
commitMetadata = commitTimeline.readCommitMetadata(instant);
82-
if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
83-
// let's consider only base files in case of delta commits
84-
commitMetadata.getWriteStats().stream().parallel()
85-
.filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath())))
86-
.forEach(hoodieWriteStat -> averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), hoodieWriteStat.getNumWrites()));
87-
} else {
88-
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), commitMetadata.fetchTotalRecordsWritten());
89-
}
90-
} catch (IOException ignore) {
91-
LOG.info("Failed to parse commit metadata", ignore);
92-
}
93-
});
75+
.limit(maxCommits).iterator();
76+
while (instants.hasNext()) {
77+
HoodieInstant instant = instants.next();
78+
try {
79+
HoodieCommitMetadata commitMetadata = commitTimeline.readCommitMetadata(instant);
80+
final HoodieAtomicLongAccumulator totalBytesWritten = HoodieAtomicLongAccumulator.create();
81+
final HoodieAtomicLongAccumulator totalRecordsWritten = HoodieAtomicLongAccumulator.create();
82+
if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
83+
// Only use base files for estimate
84+
commitMetadata.getWriteStats().stream()
85+
.filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath())))
86+
.forEach(hoodieWriteStat -> {
87+
totalBytesWritten.add(hoodieWriteStat.getTotalWriteBytes() - metadataSizeEstimate);
88+
totalRecordsWritten.add(hoodieWriteStat.getNumWrites());
89+
});
90+
} else {
91+
totalBytesWritten.add(commitMetadata.fetchTotalBytesWritten() - (commitMetadata.fetchTotalFiles() * metadataSizeEstimate));
92+
totalRecordsWritten.add(commitMetadata.fetchTotalRecordsWritten());
93+
}
94+
if (totalBytesWritten.value() > commitSizeThreshold && totalRecordsWritten.value() > 0) {
95+
return (long) Math.ceil((1.0 * totalBytesWritten.value()) / totalRecordsWritten.value());
96+
}
97+
} catch (IOException ignore) {
98+
LOG.info("Failed to parse commit metadata", ignore);
99+
}
100+
}
94101
}
95102
} catch (Throwable t) {
96103
LOG.info("Got error while trying to compute average bytes/record but will proceed to use the computed value "
97104
+ "or fallback to default config value ", t);
98105
}
99-
return averageRecordSizeStats.computeAverageRecordSize();
100-
}
101-
102-
private static class AverageRecordSizeStats implements Serializable {
103-
private final HoodieAtomicLongAccumulator totalBytesWritten;
104-
private final HoodieAtomicLongAccumulator totalRecordsWritten;
105-
private final long fileSizeThreshold;
106-
private final long avgMetadataSize;
107-
private final int defaultRecordSize;
108-
109-
public AverageRecordSizeStats(HoodieWriteConfig hoodieWriteConfig) {
110-
totalBytesWritten = HoodieAtomicLongAccumulator.create();
111-
totalRecordsWritten = HoodieAtomicLongAccumulator.create();
112-
fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
113-
avgMetadataSize = hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
114-
defaultRecordSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
115-
}
116-
117-
private void updateStats(long fileSizeInBytes, long recordWritten) {
118-
if (fileSizeInBytes > fileSizeThreshold && fileSizeInBytes > avgMetadataSize && recordWritten > 0) {
119-
totalBytesWritten.add(fileSizeInBytes - avgMetadataSize);
120-
totalRecordsWritten.add(recordWritten);
121-
}
122-
}
123-
124-
private long computeAverageRecordSize() {
125-
if (totalBytesWritten.value() > 0 && totalRecordsWritten.value() > 0) {
126-
return totalBytesWritten.value() / totalRecordsWritten.value();
127-
}
128-
// Fallback to default implementation in the cases were we either got an exception before we could
129-
// compute the average record size or there are no eligible commits yet.
130-
return defaultRecordSize;
131-
}
106+
return hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
132107
}
133108
}

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public class TestAverageRecordSizeEstimator {
6363
private static final String PARTITION1 = "partition1";
6464
private static final String TEST_WRITE_TOKEN = "1-0-1";
6565
private static final Integer DEFAULT_MAX_COMMITS = 2;
66-
private static final Integer DEFAULT_MAX_PARQUET_METADATA_SIZE = 1000;
66+
// needs to be big enough to skew the estimate
67+
private static final Integer DEFAULT_AVERAGE_PARQUET_METADATA_SIZE = 10000000;
6768
private static final Double DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD = 0.1;
6869

6970
@Test
@@ -102,7 +103,7 @@ public void testAverageRecordSizeWithNonEmptyCommitTimeline(List<Pair<HoodieInst
102103
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
103104
.withRecordSizeEstimator(AverageRecordSizeEstimator.class.getName())
104105
.withRecordSizeEstimatorMaxCommits(DEFAULT_MAX_COMMITS)
105-
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_MAX_PARQUET_METADATA_SIZE)
106+
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_AVERAGE_PARQUET_METADATA_SIZE)
106107
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
107108
.compactionRecordSizeEstimateThreshold(DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD)
108109
.build())
@@ -152,85 +153,85 @@ private static String getLogFileName(String instantTime) {
152153

153154
private static Stream<Arguments> testCases() {
154155
Long baseInstant = 20231204194919610L;
156+
Long standardCount = 10000000L;
155157
List<Arguments> arguments = new ArrayList<>();
156158
// Note the avg record estimate is based on a parquet metadata size of 500Bytes per file.
157159
// 1. straight forward. just 1 instant.
158160
arguments.add(Arguments.of(
159161
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
160-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L)))), 99L));
162+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L)))), 99L));
161163

162-
// 2. two instants. avg of both the instants
164+
// 2. two instants. latest instant should be honored
163165
arguments.add(Arguments.of(
164166
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
165-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
167+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
166168
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
167-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000000L, 200L)))), 109L));
169+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L));
168170

169-
// 3. two instants, latest commit has a small file thats just above threshold, while earliest commit is fully ignored,
170-
// since it below the threshold size limit
171+
// 3. two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored
171172
arguments.add(Arguments.of(
172173
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
173-
Collections.singletonList(generateBaseWriteStat(baseInstant, 9000L, 1000L))),
174+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
174175
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
175-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 110000, 100L)))), 99L));
176+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1000L, 200L)))), 99L));
176177

177-
// 4. 2nd instance is replace commit, it shld be excluded and should be avg of both commits.
178+
// 4. 2nd instance is replace commit, it should be excluded
178179
arguments.add(Arguments.of(
179180
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
180-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
181+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))),
181182
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
182-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 99L));
183+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 100L)))), 199L));
183184

184185
// 5. for delta commits, only parquet files should be accounted for.
185186
arguments.add(Arguments.of(
186187
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
187-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
188+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
188189
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
189-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L)))), 149L));
190+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L)))), 199L));
190191

191192
// 6. delta commit has a mix of parquet and log files. only parquet files should be accounted for.
192193
arguments.add(Arguments.of(
193194
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
194-
Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))),
195+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
195196
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
196-
Arrays.asList(generateBaseWriteStat(baseInstant + 100, 10000000L, 200L),
197-
generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 190L));
197+
Arrays.asList(generateBaseWriteStat(baseInstant + 100, standardCount, 200L),
198+
generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 199L));
198199

199200
// 7. 2nd delta commit only has log files. and so we honor 1st delta commit size.
200201
arguments.add(Arguments.of(
201202
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
202-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 100L))),
203+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
203204
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
204-
Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L),
205-
generateLogWriteStat(baseInstant + 100, 10000000L, 300L)))), 99L));
205+
Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L),
206+
generateLogWriteStat(baseInstant + 100, standardCount, 300L)))), 99L));
206207

207208
// 8. since default max commits is overriden to 2 commits, ignore the earliest commit here since there are total 3 commits
208209
arguments.add(Arguments.of(
209210
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
210-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))),
211+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 200L))),
211212
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
212-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))),
213+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 1L, 50L))),
213214
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)),
214-
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 10000000L, 100L)))), 74L));
215+
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1L, 100L)))), Long.valueOf(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.defaultValue())));
215216

216217
// 9. replace commits should be ignored despite being the latest commits.
217218
arguments.add(Arguments.of(
218219
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
219-
Collections.singletonList(generateBaseWriteStat(baseInstant, 1000000L, 100L))),
220+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 100L))),
220221
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
221-
Arrays.asList(generateLogWriteStat(baseInstant + 100, 1000000L, 200L),
222+
Arrays.asList(generateLogWriteStat(baseInstant + 100, standardCount, 200L),
222223
generateLogWriteStat(baseInstant + 100, 1000000L, 300L))),
223224
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
224-
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 1000000L, 2000L))),
225+
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, standardCount, 2000L))),
225226
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
226-
Collections.singletonList(generateBaseWriteStat(baseInstant + 300, 1000000L, 3000L)))), 99L));
227+
Collections.singletonList(generateBaseWriteStat(baseInstant + 300, standardCount, 3000L)))), 99L));
227228

228229
// 10. Ignore commit stat with 0 records
229230
arguments.add(Arguments.of(
230231
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
231-
Collections.singletonList(generateBaseWriteStat(baseInstant, 10000000L, 1000L))),
232+
Collections.singletonList(generateBaseWriteStat(baseInstant, standardCount, 1000L))),
232233
Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
233-
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, 10000000L, 50L))),
234+
Collections.singletonList(generateBaseWriteStat(baseInstant + 100, standardCount, 50L))),
234235
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 200)),
235236
Collections.singletonList(generateBaseWriteStat(baseInstant + 200, 0L, 1000L)))), 49L));
236237

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@ public long fetchTotalFilesUpdated() {
299299
return totalFilesUpdated;
300300
}
301301

302+
public long fetchTotalFiles() {
303+
return partitionToWriteStats.values().stream().mapToLong(List::size).sum();
304+
}
305+
302306
public long fetchTotalUpdateRecordsWritten() {
303307
long totalUpdateRecordsWritten = 0;
304308
for (List<HoodieWriteStat> stats : partitionToWriteStats.values()) {

0 commit comments

Comments
 (0)