Skip to content

Commit 3099df0

Browse files
committed
update: use map to store the cdc stats
1 parent f9c6e17 commit 3099df0

12 files changed

Lines changed: 57 additions & 65 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.hudi.common.util.SizeEstimator;
3636
import org.apache.hudi.common.util.StringUtils;
3737
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
38-
import org.apache.hudi.common.util.collection.Pair;
3938
import org.apache.hudi.config.HoodieWriteConfig;
4039
import org.apache.hudi.exception.HoodieException;
4140
import org.apache.hudi.exception.HoodieIOException;
@@ -95,7 +94,7 @@ public class HoodieCDCLogger implements Closeable {
9594
private long averageCDCRecordSize = 0;
9695

9796
// Number of records that must be written to meet the max block size for a log block
98-
private AtomicInteger numOfCDCRecordInMemory = new AtomicInteger();
97+
private AtomicInteger numOfCDCRecordsInMemory = new AtomicInteger();
9998

10099
private final SizeEstimator<HoodieAvroPayload> sizeEstimator;
101100

@@ -174,11 +173,11 @@ public void put(HoodieRecord hoodieRecord,
174173
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
175174
}
176175
cdcData.put(recordKey, payload);
177-
numOfCDCRecordInMemory.incrementAndGet();
176+
numOfCDCRecordsInMemory.incrementAndGet();
178177
}
179178

180179
private void flushIfNeeded(Boolean force) {
181-
if (force || numOfCDCRecordInMemory.get() * averageCDCRecordSize >= maxBlockSize) {
180+
if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) {
182181
try {
183182
List<IndexedRecord> records = cdcData.values().stream()
184183
.map(record -> {
@@ -199,28 +198,25 @@ private void flushIfNeeded(Boolean force) {
199198

200199
// reset stat
201200
cdcData.clear();
202-
numOfCDCRecordInMemory = new AtomicInteger();
201+
numOfCDCRecordsInMemory = new AtomicInteger();
203202
} catch (Exception e) {
204203
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
205204
}
206205
}
207206
}
208207

209-
public Pair<List<String>, List<Long>> getCDCWriteStats() {
210-
List<String> cdcPaths = new ArrayList<>();
211-
List<Long> cdcWriteSizeList = new ArrayList<>();
208+
public Map<String, Long> getCDCWriteStats() {
209+
Map<String, Long> stats = new HashMap<>();
212210
try {
213211
for (Path cdcAbsPath : cdcAbsPaths) {
214212
String cdcFileName = cdcAbsPath.getName();
215213
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
216-
217-
cdcPaths.add(cdcPath);
218-
cdcWriteSizeList.add(FSUtils.getFileSize(fs, cdcAbsPath));
214+
stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath));
219215
}
220216
} catch (IOException e) {
221217
throw new HoodieUpsertException("Failed to get cdc write stat", e);
222218
}
223-
return Pair.of(cdcPaths, cdcWriteSizeList);
219+
return stats;
224220
}
225221

226222
@Override

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.hudi.common.model.HoodieWriteStat;
2828
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
2929
import org.apache.hudi.common.util.Option;
30-
import org.apache.hudi.common.util.collection.Pair;
3130
import org.apache.hudi.config.HoodieWriteConfig;
3231
import org.apache.hudi.keygen.BaseKeyGenerator;
3332
import org.apache.hudi.table.HoodieTable;
@@ -107,10 +106,8 @@ public List<WriteStatus> close() {
107106
}
108107

109108
cdcLogger.close();
110-
Pair<List<String>, List<Long>> cdcWriteStats = cdcLogger.getCDCWriteStats();
111109
HoodieWriteStat stat = writeStatuses.get(0).getStat();
112-
stat.setCdcPath(cdcWriteStats.getLeft());
113-
stat.setCdcWriteBytes(cdcWriteStats.getRight());
110+
stat.setCdcStats(cdcLogger.getCDCWriteStats());
114111
return writeStatuses;
115112
}
116113
}

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hudi.common.model.HoodieWriteStat;
2727
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
2828
import org.apache.hudi.common.util.Option;
29-
import org.apache.hudi.common.util.collection.Pair;
3029
import org.apache.hudi.config.HoodieWriteConfig;
3130
import org.apache.hudi.table.HoodieTable;
3231

@@ -82,10 +81,8 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
8281
public List<WriteStatus> close() {
8382
List<WriteStatus> writeStatuses = super.close();
8483
cdcLogger.close();
85-
Pair<List<String>, List<Long>> cdcWriteStats = cdcLogger.getCDCWriteStats();
8684
HoodieWriteStat stat = writeStatuses.get(0).getStat();
87-
stat.setCdcPath(cdcWriteStats.getLeft());
88-
stat.setCdcWriteBytes(cdcWriteStats.getRight());
85+
stat.setCdcStats(cdcLogger.getCDCWriteStats());
8986
return writeStatuses;
9087
}
9188
}

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hudi.common.model.HoodieWriteStat;
2727
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
2828
import org.apache.hudi.common.util.Option;
29-
import org.apache.hudi.common.util.collection.Pair;
3029
import org.apache.hudi.config.HoodieWriteConfig;
3130
import org.apache.hudi.table.HoodieTable;
3231

@@ -85,10 +84,8 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
8584
public List<WriteStatus> close() {
8685
List<WriteStatus> writeStatuses = super.close();
8786
cdcLogger.close();
88-
Pair<List<String>, List<Long>> cdcWriteStats = cdcLogger.getCDCWriteStats();
8987
HoodieWriteStat stat = writeStatuses.get(0).getStat();
90-
stat.setCdcPath(cdcWriteStats.getLeft());
91-
stat.setCdcWriteBytes(cdcWriteStats.getRight());
88+
stat.setCdcStats(cdcLogger.getCDCWriteStats());
9289
return writeStatuses;
9390
}
9491
}

hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,9 +451,13 @@ public static Integer getTaskAttemptIdFromLogPath(Path path) {
451451
* Get the last part of the file name in the log file and convert to int.
452452
*/
453453
public static int getFileVersionFromLog(Path logPath) {
454-
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
454+
return getFileVersionFromLog(logPath.getName());
455+
}
456+
457+
public static int getFileVersionFromLog(String logFileName) {
458+
Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName);
455459
if (!matcher.find()) {
456-
throw new InvalidHoodiePathException(logPath, "LogFile");
460+
throw new HoodieIOException("Invalid log file name: " + logFileName);
457461
}
458462
return Integer.parseInt(matcher.group(4));
459463
}

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import com.fasterxml.jackson.annotation.JsonIgnore;
2222
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2323
import org.apache.hadoop.fs.Path;
24-
import org.apache.hudi.common.util.StringUtils;
24+
import org.apache.hudi.common.util.JsonUtils;
2525

2626
import javax.annotation.Nullable;
2727

2828
import java.io.Serializable;
29-
import java.util.List;
29+
import java.util.Map;
3030

3131
/**
3232
* Statistics about a single Hoodie write operation.
@@ -47,9 +47,9 @@ public class HoodieWriteStat implements Serializable {
4747
private String path;
4848

4949
/**
50-
* Relative CDC file path that store the CDC data.
50+
* Relative CDC file path that store the CDC data and its size.
5151
*/
52-
private List<String> cdcPaths;
52+
private Map<String, Long> cdcStats;
5353

5454
/**
5555
* The previous version of the file. (null if this is the first version. i.e insert)
@@ -77,11 +77,6 @@ public class HoodieWriteStat implements Serializable {
7777
*/
7878
private long numInserts;
7979

80-
/**
81-
* Total number of cdc bytes written.
82-
*/
83-
private List<Long> cdcWriteBytes;
84-
8580
/**
8681
* Total number of bytes written.
8782
*/
@@ -207,18 +202,10 @@ public long getTotalWriteBytes() {
207202
return totalWriteBytes;
208203
}
209204

210-
public List<Long> getCdcWriteBytes() {
211-
return cdcWriteBytes;
212-
}
213-
214205
public void setTotalWriteBytes(long totalWriteBytes) {
215206
this.totalWriteBytes = totalWriteBytes;
216207
}
217208

218-
public void setCdcWriteBytes(List<Long> cdcWriteBytes) {
219-
this.cdcWriteBytes = cdcWriteBytes;
220-
}
221-
222209
public long getTotalWriteErrors() {
223210
return totalWriteErrors;
224211
}
@@ -256,12 +243,12 @@ public String getPath() {
256243
}
257244

258245
@Nullable
259-
public List<String> getCdcPaths() {
260-
return cdcPaths;
246+
public Map<String, Long> getCdcStats() {
247+
return cdcStats;
261248
}
262249

263-
public void setCdcPath(List<String> cdcPaths) {
264-
this.cdcPaths = cdcPaths;
250+
public void setCdcStats(Map<String, Long> cdcStats) {
251+
this.cdcStats = cdcStats;
265252
}
266253

267254
public String getPartitionPath() {
@@ -386,13 +373,10 @@ public void setPath(Path basePath, Path path) {
386373

387374
@Override
388375
public String toString() {
389-
String joinedCDCPathStr = cdcPaths == null ? "" : StringUtils.join(cdcPaths.toArray(new String[0]), ",");
390-
String joinedCDCWriteBytesStr = cdcWriteBytes == null ? "" : StringUtils.join(
391-
cdcWriteBytes.stream().map(Object::toString).toArray(String[]::new), ",");
392376
return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit
393377
+ '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites
394378
+ ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath
395-
+ '\'' + ", cdcPaths='" + joinedCDCPathStr + ", cdcWriteBytes=" + joinedCDCWriteBytesStr
379+
+ '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats)
396380
+ '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords
397381
+ ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", totalLogSizeCompacted=" + totalLogSizeCompacted
398382
+ ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + ", totalLogBlocks=" + totalLogBlocks

hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ private HoodieCDCFileSplit parseWriteStat(
254254
final String instantTs = instant.getTimestamp();
255255

256256
HoodieCDCFileSplit cdcFileSplit;
257-
if (CollectionUtils.isNullOrEmpty(writeStat.getCdcPaths())) {
257+
if (CollectionUtils.isNullOrEmpty(writeStat.getCdcStats())) {
258258
// no cdc log files can be used directly. we reuse the existing data file to retrieve the change data.
259259
String path = writeStat.getPath();
260260
if (FSUtils.isBaseFile(new Path(path))) {
@@ -287,7 +287,7 @@ private HoodieCDCFileSplit parseWriteStat(
287287
} else {
288288
// this is a cdc log
289289
if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
290-
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcPaths());
290+
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet());
291291
} else {
292292
try {
293293
HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn(
@@ -301,7 +301,7 @@ private HoodieCDCFileSplit parseWriteStat(
301301
if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) {
302302
beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>());
303303
}
304-
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcPaths(),
304+
cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(),
305305
Option.ofNullable(beforeFileSlice), Option.ofNullable(currentFileSlice));
306306
} catch (Exception e) {
307307
throw new HoodieException("Fail to parse HoodieWriteStat", e);

hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
package org.apache.hudi.common.table.cdc;
2020

21+
import org.apache.hudi.common.fs.FSUtils;
2122
import org.apache.hudi.common.model.FileSlice;
2223
import org.apache.hudi.common.util.Option;
2324

2425
import org.jetbrains.annotations.NotNull;
2526

2627
import java.io.Serializable;
28+
import java.util.Collection;
2729
import java.util.Collections;
30+
import java.util.Comparator;
2831
import java.util.List;
32+
import java.util.stream.Collectors;
2933

3034
/**
3135
* This contains all the information that retrieve the change data at a single file group and
@@ -73,7 +77,7 @@ public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Strin
7377
this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty());
7478
}
7579

76-
public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, List<String> cdcFiles) {
80+
public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Collection<String> cdcFiles) {
7781
this(instant, cdcInferCase, cdcFiles, Option.empty(), Option.empty());
7882
}
7983

@@ -89,12 +93,13 @@ public HoodieCDCFileSplit(
8993
public HoodieCDCFileSplit(
9094
String instant,
9195
HoodieCDCInferCase cdcInferCase,
92-
List<String> cdcFileS,
96+
Collection<String> cdcFiles,
9397
Option<FileSlice> beforeFileSlice,
9498
Option<FileSlice> afterFileSlice) {
9599
this.instant = instant;
96100
this.cdcInferCase = cdcInferCase;
97-
this.cdcFiles = cdcFileS;
101+
this.cdcFiles = cdcFiles.stream()
102+
.sorted(Comparator.comparingInt(FSUtils::getFileVersionFromLog)).collect(Collectors.toList());
98103
this.beforeFileSlice = beforeFileSlice;
99104
this.afterFileSlice = afterFileSlice;
100105
}

hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public static boolean isNullOrEmpty(Collection<?> c) {
5858
return Objects.isNull(c) || c.isEmpty();
5959
}
6060

61+
public static boolean isNullOrEmpty(Map<?, ?> m) {
62+
return Objects.isNull(m) || m.isEmpty();
63+
}
64+
6165
public static boolean nonEmpty(Collection<?> c) {
6266
return !isNullOrEmpty(c);
6367
}

hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonAutoDetect;
2323
import com.fasterxml.jackson.annotation.PropertyAccessor;
24+
import com.fasterxml.jackson.core.JsonProcessingException;
2425
import com.fasterxml.jackson.databind.DeserializationFeature;
2526
import com.fasterxml.jackson.databind.ObjectMapper;
27+
import org.apache.hudi.exception.HoodieIOException;
2628

2729
public class JsonUtils {
2830

@@ -41,4 +43,13 @@ public class JsonUtils {
4143
public static ObjectMapper getObjectMapper() {
4244
return MAPPER;
4345
}
46+
47+
public static String toString(Object value) {
48+
try {
49+
return MAPPER.writeValueAsString(value);
50+
} catch (JsonProcessingException e) {
51+
throw new HoodieIOException(
52+
"Fail to convert the class: " + value.getClass().getName() + " to Json String", e);
53+
}
54+
}
4455
}

0 commit comments

Comments
 (0)