Skip to content

Commit df69aa7

Browse files
authored
[HUDI-3478] Implement CDC Read in Spark (#6727)
1 parent 79b3e2b commit df69aa7

22 files changed

Lines changed: 3075 additions & 575 deletions

hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hudi.common.util.Option;
2222

2323
import java.io.Serializable;
24+
import java.util.List;
2425
import java.util.Objects;
2526
import java.util.TreeSet;
2627
import java.util.stream.Stream;
@@ -71,6 +72,15 @@ public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime) {
7172
this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
7273
}
7374

75+
public FileSlice(HoodieFileGroupId fileGroupId, String baseInstantTime,
76+
HoodieBaseFile baseFile, List<HoodieLogFile> logFiles) {
77+
this.fileGroupId = fileGroupId;
78+
this.baseInstantTime = baseInstantTime;
79+
this.baseFile = baseFile;
80+
this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
81+
this.logFiles.addAll(logFiles);
82+
}
83+
7484
public void setBaseFile(HoodieBaseFile baseFile) {
7585
this.baseFile = baseFile;
7686
}

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import org.apache.hudi.common.util.collection.Pair;
2525

2626
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import com.fasterxml.jackson.databind.JsonNode;
28+
import com.fasterxml.jackson.databind.node.ArrayNode;
29+
2730
import org.apache.hadoop.conf.Configuration;
2831
import org.apache.hadoop.fs.FileStatus;
2932
import org.apache.hadoop.fs.Path;
33+
import org.apache.hudi.exception.HoodieException;
3034
import org.apache.log4j.LogManager;
3135
import org.apache.log4j.Logger;
3236

@@ -37,6 +41,7 @@
3741
import java.util.Collection;
3842
import java.util.HashMap;
3943
import java.util.HashSet;
44+
import java.util.Iterator;
4045
import java.util.List;
4146
import java.util.Map;
4247
import java.util.stream.Collectors;
@@ -236,6 +241,44 @@ public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Except
236241
return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
237242
}
238243

244+
/**
245+
* parse the bytes of deltacommit, and get the base file and the log files belonging to this
246+
* provided file group.
247+
*/
248+
// TODO: refactor this method to avoid doing the json tree walking (HUDI-4822).
249+
public static Option<Pair<String, List<String>>> getFileSliceForFileGroupFromDeltaCommit(
250+
byte[] bytes, HoodieFileGroupId fileGroupId) {
251+
String jsonStr = new String(bytes, StandardCharsets.UTF_8);
252+
if (jsonStr.isEmpty()) {
253+
return Option.empty();
254+
}
255+
256+
try {
257+
JsonNode ptToWriteStatsMap = JsonUtils.getObjectMapper().readTree(jsonStr).get("partitionToWriteStats");
258+
Iterator<Map.Entry<String, JsonNode>> pts = ptToWriteStatsMap.fields();
259+
while (pts.hasNext()) {
260+
Map.Entry<String, JsonNode> ptToWriteStats = pts.next();
261+
if (ptToWriteStats.getValue().isArray()) {
262+
for (JsonNode writeStat : ptToWriteStats.getValue()) {
263+
HoodieFileGroupId fgId = new HoodieFileGroupId(ptToWriteStats.getKey(), writeStat.get("fileId").asText());
264+
if (fgId.equals(fileGroupId)) {
265+
String baseFile = writeStat.get("baseFile").asText();
266+
ArrayNode logFilesNode = (ArrayNode) writeStat.get("logFiles");
267+
List<String> logFiles = new ArrayList<>();
268+
for (JsonNode logFile : logFilesNode) {
269+
logFiles.add(logFile.asText());
270+
}
271+
return Option.of(Pair.of(baseFile, logFiles));
272+
}
273+
}
274+
}
275+
}
276+
return Option.empty();
277+
} catch (Exception e) {
278+
throw new HoodieException("Fail to parse the base file and log files from DeltaCommit", e);
279+
}
280+
}
281+
239282
// Here the functions are named "fetch" instead of "get", to get avoid of the json conversion.
240283
public long fetchTotalPartitionsWritten() {
241284
return partitionToWriteStats.size();

0 commit comments

Comments
 (0)