Skip to content

Commit e8c26c5

Browse files
authored
[HUDI-3478] Implement CDC Write in Spark (#6697)
1 parent 6b71575 commit e8c26c5

20 files changed

Lines changed: 848 additions & 46 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
4242
import org.apache.hudi.common.model.IOType;
4343
import org.apache.hudi.common.table.log.AppendResult;
44-
import org.apache.hudi.common.table.log.HoodieLogFormat;
4544
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
4645
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
4746
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
@@ -471,23 +470,6 @@ public List<WriteStatus> writeStatuses() {
471470
return statuses;
472471
}
473472

474-
private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
475-
throws IOException {
476-
Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
477-
478-
return HoodieLogFormat.newWriterBuilder()
479-
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
480-
.withFileId(fileId)
481-
.overBaseCommit(baseCommitTime)
482-
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
483-
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
484-
.withSizeThreshold(config.getLogFileMaxSize())
485-
.withFs(fs)
486-
.withRolloverLogWriteToken(writeToken)
487-
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
488-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
489-
}
490-
491473
/**
492474
* Whether there is need to update the record location.
493475
*/
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.io;
20+
21+
import org.apache.avro.Schema;
22+
import org.apache.avro.generic.GenericData;
23+
import org.apache.avro.generic.GenericRecord;
24+
import org.apache.avro.generic.IndexedRecord;
25+
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.fs.Path;
28+
29+
import org.apache.hudi.avro.HoodieAvroUtils;
30+
import org.apache.hudi.common.fs.FSUtils;
31+
import org.apache.hudi.common.model.HoodieAvroPayload;
32+
import org.apache.hudi.common.model.HoodieRecord;
33+
import org.apache.hudi.common.model.HoodieWriteStat;
34+
import org.apache.hudi.common.table.HoodieTableConfig;
35+
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
36+
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
37+
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
38+
import org.apache.hudi.common.table.log.AppendResult;
39+
import org.apache.hudi.common.table.log.HoodieLogFormat;
40+
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
41+
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
42+
import org.apache.hudi.common.util.DefaultSizeEstimator;
43+
import org.apache.hudi.common.util.Option;
44+
import org.apache.hudi.common.util.StringUtils;
45+
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
46+
import org.apache.hudi.config.HoodieWriteConfig;
47+
import org.apache.hudi.exception.HoodieException;
48+
import org.apache.hudi.exception.HoodieIOException;
49+
import org.apache.hudi.exception.HoodieUpsertException;
50+
51+
import java.io.Closeable;
52+
import java.io.IOException;
53+
import java.util.Collections;
54+
import java.util.HashMap;
55+
import java.util.List;
56+
import java.util.Map;
57+
import java.util.stream.Collectors;
58+
59+
/**
60+
* This class encapsulates all the cdc-writing functions.
61+
*/
62+
public class HoodieCDCLogger implements Closeable {
63+
64+
private final String commitTime;
65+
66+
private final String keyField;
67+
68+
private final Schema dataSchema;
69+
70+
private final boolean populateMetaFields;
71+
72+
// writer for cdc data
73+
private final HoodieLogFormat.Writer cdcWriter;
74+
75+
private final boolean cdcEnabled;
76+
77+
private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
78+
79+
private final Schema cdcSchema;
80+
81+
private final String cdcSchemaString;
82+
83+
// the cdc data
84+
private final Map<String, HoodieAvroPayload> cdcData;
85+
86+
public HoodieCDCLogger(
87+
String commitTime,
88+
HoodieWriteConfig config,
89+
HoodieTableConfig tableConfig,
90+
Schema schema,
91+
HoodieLogFormat.Writer cdcWriter,
92+
long maxInMemorySizeInBytes) {
93+
try {
94+
this.commitTime = commitTime;
95+
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
96+
this.populateMetaFields = config.populateMetaFields();
97+
this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD
98+
: tableConfig.getRecordKeyFieldProp();
99+
this.cdcWriter = cdcWriter;
100+
101+
this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
102+
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
103+
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
104+
105+
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
106+
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
107+
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
108+
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
109+
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
110+
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
111+
} else {
112+
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
113+
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
114+
}
115+
116+
this.cdcData = new ExternalSpillableMap<>(
117+
maxInMemorySizeInBytes,
118+
config.getSpillableMapBasePath(),
119+
new DefaultSizeEstimator<>(),
120+
new DefaultSizeEstimator<>(),
121+
config.getCommonConfig().getSpillableDiskMapType(),
122+
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
123+
);
124+
} catch (IOException e) {
125+
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
126+
}
127+
}
128+
129+
public void put(HoodieRecord hoodieRecord,
130+
GenericRecord oldRecord,
131+
Option<IndexedRecord> newRecord) {
132+
if (cdcEnabled) {
133+
String recordKey = hoodieRecord.getRecordKey();
134+
GenericData.Record cdcRecord;
135+
if (newRecord.isPresent()) {
136+
GenericRecord record = (GenericRecord) newRecord.get();
137+
if (oldRecord == null) {
138+
// inserted cdc record
139+
cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
140+
null, record);
141+
} else {
142+
// updated cdc record
143+
cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
144+
oldRecord, record);
145+
}
146+
} else {
147+
// deleted cdc record
148+
cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
149+
oldRecord, null);
150+
}
151+
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
152+
}
153+
}
154+
155+
private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
156+
String recordKey,
157+
GenericRecord oldRecord,
158+
GenericRecord newRecord) {
159+
GenericData.Record record;
160+
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
161+
record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
162+
removeCommitMetadata(oldRecord), newRecord);
163+
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
164+
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
165+
removeCommitMetadata(oldRecord));
166+
} else {
167+
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
168+
}
169+
return record;
170+
}
171+
172+
private GenericRecord removeCommitMetadata(GenericRecord record) {
173+
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
174+
}
175+
176+
public boolean isEmpty() {
177+
return !this.cdcEnabled || this.cdcData.isEmpty();
178+
}
179+
180+
public Option<AppendResult> writeCDCData() {
181+
if (isEmpty()) {
182+
return Option.empty();
183+
}
184+
185+
try {
186+
List<IndexedRecord> records = cdcData.values().stream()
187+
.map(record -> {
188+
try {
189+
return record.getInsertValue(cdcSchema).get();
190+
} catch (IOException e) {
191+
throw new HoodieIOException("Failed to get cdc record", e);
192+
}
193+
}).collect(Collectors.toList());
194+
195+
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
196+
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
197+
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString);
198+
199+
HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
200+
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));
201+
202+
// call close to trigger the data flush.
203+
this.close();
204+
205+
return Option.of(result);
206+
} catch (Exception e) {
207+
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
208+
}
209+
}
210+
211+
@Override
212+
public void close() {
213+
try {
214+
if (cdcWriter != null) {
215+
cdcWriter.close();
216+
}
217+
} catch (IOException e) {
218+
throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
219+
} finally {
220+
cdcData.clear();
221+
}
222+
}
223+
224+
public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
225+
long recordsWritten,
226+
long insertRecordsWritten) {
227+
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
228+
// the following cases where we do not need to write out the cdc file:
229+
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
230+
// case 2: all the data are new-coming,
231+
return Option.empty();
232+
}
233+
return cdcLogger.writeCDCData();
234+
}
235+
236+
public static void setCDCStatIfNeeded(HoodieWriteStat stat,
237+
Option<AppendResult> cdcResult,
238+
String partitionPath,
239+
FileSystem fs) {
240+
try {
241+
if (cdcResult.isPresent()) {
242+
Path cdcLogFile = cdcResult.get().logFile().getPath();
243+
String cdcFileName = cdcLogFile.getName();
244+
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
245+
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile);
246+
stat.setCdcPath(cdcPath);
247+
stat.setCdcWriteBytes(cdcFileSizeInBytes);
248+
}
249+
} catch (IOException e) {
250+
throw new HoodieUpsertException("Failed to set cdc write stat", e);
251+
}
252+
}
253+
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hudi.io;
2020

21+
import org.apache.hadoop.fs.Path;
22+
2123
import org.apache.hudi.client.WriteStatus;
2224
import org.apache.hudi.common.engine.TaskContextSupplier;
2325
import org.apache.hudi.common.fs.FSUtils;
@@ -31,6 +33,9 @@
3133
import org.apache.hudi.common.model.HoodieWriteStat;
3234
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
3335
import org.apache.hudi.common.model.IOType;
36+
import org.apache.hudi.common.table.HoodieTableConfig;
37+
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
38+
import org.apache.hudi.common.table.log.AppendResult;
3439
import org.apache.hudi.common.util.DefaultSizeEstimator;
3540
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
3641
import org.apache.hudi.common.util.Option;
@@ -50,7 +55,7 @@
5055
import org.apache.avro.Schema;
5156
import org.apache.avro.generic.GenericRecord;
5257
import org.apache.avro.generic.IndexedRecord;
53-
import org.apache.hadoop.fs.Path;
58+
5459
import org.apache.log4j.LogManager;
5560
import org.apache.log4j.Logger;
5661

@@ -102,6 +107,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
102107
protected Map<String, HoodieRecord<T>> keyToNewRecords;
103108
protected Set<String> writtenRecordKeys;
104109
protected HoodieFileWriter<IndexedRecord> fileWriter;
110+
protected boolean cdcEnabled = false;
111+
protected HoodieCDCLogger cdcLogger;
105112
private boolean preserveMetadata = false;
106113

107114
protected Path newFilePath;
@@ -203,6 +210,18 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
203210
// Create the writer for writing the new version file
204211
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
205212
writeSchemaWithMetaFields, taskContextSupplier);
213+
214+
// init the cdc logger
215+
this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
216+
if (cdcEnabled) {
217+
this.cdcLogger = new HoodieCDCLogger(
218+
instantTime,
219+
config,
220+
hoodieTable.getMetaClient().getTableConfig(),
221+
tableSchema,
222+
createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
223+
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
224+
}
206225
} catch (IOException io) {
207226
LOG.error("Error in update task at commit " + instantTime, io);
208227
writeStatus.setGlobalError(io);
@@ -281,7 +300,11 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
281300
return false;
282301
}
283302
}
284-
return writeRecord(hoodieRecord, indexedRecord, isDelete);
303+
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
304+
if (result && cdcEnabled) {
305+
cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
306+
}
307+
return result;
285308
}
286309

287310
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -292,6 +315,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
292315
return;
293316
}
294317
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
318+
if (cdcEnabled) {
319+
cdcLogger.put(hoodieRecord, null, insertRecord);
320+
}
295321
insertRecordsWritten++;
296322
}
297323
}
@@ -402,6 +428,8 @@ protected void writeIncomingRecords() throws IOException {
402428
@Override
403429
public List<WriteStatus> close() {
404430
try {
431+
HoodieWriteStat stat = writeStatus.getStat();
432+
405433
writeIncomingRecords();
406434

407435
if (keyToNewRecords instanceof ExternalSpillableMap) {
@@ -416,9 +444,12 @@ public List<WriteStatus> close() {
416444
fileWriter = null;
417445
}
418446

419-
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
420-
HoodieWriteStat stat = writeStatus.getStat();
447+
// if there are cdc data written, set the CDC-related information.
448+
Option<AppendResult> cdcResult =
449+
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
450+
HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);
421451

452+
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
422453
stat.setTotalWriteBytes(fileSizeInBytes);
423454
stat.setFileSizeInBytes(fileSizeInBytes);
424455
stat.setNumWrites(recordsWritten);

0 commit comments

Comments
 (0)