Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8a6e5ea
[RFC-51][HUDI-3478] Hudi CDC
YannByron May 23, 2022
50a4b20
update
YannByron Jun 17, 2022
9d4f52c
[RFC-51][HUDI-3478] support the case that disable cdc supplemental lo…
YannByron Jun 22, 2022
1c2f3ac
update
YannByron Jun 22, 2022
f7fcb8e
solve comments
YannByron Jul 22, 2022
88c7794
update spark3.3 avro serde
YannByron Aug 4, 2022
b17976d
fix streaming ut
YannByron Aug 4, 2022
e6641ad
cdc quey type -> incremental query table + cdc output format
YannByron Aug 22, 2022
607e9aa
support three supplemental.logging.modes
YannByron Aug 23, 2022
36c8820
rename configs
YannByron Aug 23, 2022
e8a0320
rebase master
YannByron Aug 23, 2022
c135e55
solve comments
YannByron Aug 30, 2022
957fdd4
solve comments
YannByron Sep 9, 2022
085439e
abstract HoodieCDCLogger and leave a TODO to link jira
YannByron Sep 14, 2022
3d823ec
remove SerializableRecord
YannByron Sep 15, 2022
8cbbbec
remove unused code
YannByron Sep 15, 2022
51aa8ef
enum HoodieCDCSupplementalLoggingMode
YannByron Sep 15, 2022
ccd2eee
update
YannByron Sep 15, 2022
5e8db9e
remove meta fields for cdc data
YannByron Sep 15, 2022
730678c
update
YannByron Sep 15, 2022
18fc43f
only codes related to writing
YannByron Sep 15, 2022
831fa81
remove avro serde changes
YannByron Sep 16, 2022
2238e10
remove cdc-query-related codes
YannByron Sep 16, 2022
5e1903a
remove cdc-query-related codes
YannByron Sep 16, 2022
6766c66
remove cdc-query-related codes
YannByron Sep 16, 2022
33fac49
update cdclogger
YannByron Sep 16, 2022
30983c8
minor
YannByron Sep 16, 2022
5d92832
update: use -cdc as the suffix of log file
YannByron Sep 18, 2022
d91c2c0
update
YannByron Sep 19, 2022
f547dde
update to trigger ci
YannByron Sep 19, 2022
34394ff
update: resolve comments
YannByron Sep 20, 2022
539da70
incorrect indentation
YannByron Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
Expand Down Expand Up @@ -471,23 +470,6 @@ public List<WriteStatus> writeStatuses() {
return statuses;
}

private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException {
Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();

return HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId)
.overBaseCommit(baseCommitTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

/**
* Whether there is need to update the record location.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* This class encapsulates all the cdc-writing functions.
*/
public class HoodieCDCLogger implements Closeable {

private final String commitTime;

private final String keyField;

private final Schema dataSchema;

private final boolean populateMetaFields;

// writer for cdc data
private final HoodieLogFormat.Writer cdcWriter;

private final boolean cdcEnabled;

private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;

private final Schema cdcSchema;

private final String cdcSchemaString;

// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;

public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
HoodieTableConfig tableConfig,
Schema schema,
HoodieLogFormat.Writer cdcWriter,
long maxInMemorySizeInBytes) {
try {
this.commitTime = commitTime;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.populateMetaFields = config.populateMetaFields();
this.keyField = populateMetaFields ? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.cdcWriter = cdcWriter;

this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cdcEnabled flag always true here ? Because this is a cdc logger.

config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));

if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
} else {
this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
}

this.cdcData = new ExternalSpillableMap<>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to tackle in this PR, but want to call it out that accumulating records in memory will certainly be problematic from memory footprint (as well as GC) perspective.

Note, that Spark does process either via iteration where it doesn't hold records (also limited to the micro-batch size if underlying format supports batch reads, otherwise it will hold just 1 record at a time) in memory for longer than the RDD execution chain requires. The only accumulation point is Parquet writer which is much more efficient though: a) relying on encoding as well as compression, b) storing binary/serialized date in memory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for pointing this out. i will think deeply about it.

maxInMemorySizeInBytes,
config.getSpillableMapBasePath(),
new DefaultSizeEstimator<>(),
new DefaultSizeEstimator<>(),
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
);
} catch (IOException e) {
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
}
}

public void put(HoodieRecord hoodieRecord,
GenericRecord oldRecord,
Option<IndexedRecord> newRecord) {
if (cdcEnabled) {
String recordKey = hoodieRecord.getRecordKey();
GenericData.Record cdcRecord;
if (newRecord.isPresent()) {
GenericRecord record = (GenericRecord) newRecord.get();
if (oldRecord == null) {
// inserted cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
null, record);
} else {
// updated cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
oldRecord, record);
}
} else {
// deleted cdc record
cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
}
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
}
}

private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
String recordKey,
GenericRecord oldRecord,
GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
removeCommitMetadata(oldRecord), newRecord);
} else if (cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE)) {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
removeCommitMetadata(oldRecord));
} else {
record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
}
return record;
}

private GenericRecord removeCommitMetadata(GenericRecord record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can re-use HoodieAvroUtils.rewriteRecordWithNewSchema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new HashMap<>());
}

public boolean isEmpty() {
return !this.cdcEnabled || this.cdcData.isEmpty();
}

public Option<AppendResult> writeCDCData() {
if (isEmpty()) {
return Option.empty();
}

try {
List<IndexedRecord> records = cdcData.values().stream()
.map(record -> {
try {
return record.getInsertValue(cdcSchema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString);

HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

// call close to trigger the data flush.
this.close();

return Option.of(result);
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
}
}

@Override
public void close() {
try {
if (cdcWriter != null) {
cdcWriter.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
} finally {
cdcData.clear();
}
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method we can move back to HoodieWriteHandle as these behavior should be controlled from the WriteHandle (previous comments about extraction were related to setCDCStatIfNeeded)

long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
FileSystem fs) {
try {
if (cdcResult.isPresent()) {
Path cdcLogFile = cdcResult.get().logFile().getPath();
String cdcFileName = cdcLogFile.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile);
stat.setCdcPath(cdcPath);
stat.setCdcWriteBytes(cdcFileSizeInBytes);
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to set cdc write stat", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.io;

import org.apache.hadoop.fs.Path;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -31,6 +33,9 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
Expand All @@ -50,7 +55,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -102,6 +107,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
protected boolean cdcEnabled = false;
protected HoodieCDCLogger cdcLogger;
private boolean preserveMetadata = false;

protected Path newFilePath;
Expand Down Expand Up @@ -203,6 +210,18 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
// Create the writer for writing the new version file
fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
writeSchemaWithMetaFields, taskContextSupplier);

// init the cdc logger
this.cdcEnabled = config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
if (cdcEnabled) {
this.cdcLogger = new HoodieCDCLogger(
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
tableSchema,
createLogWriter(Option.empty(), instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
} catch (IOException io) {
LOG.error("Error in update task at commit " + instantTime, io);
writeStatus.setGlobalError(io);
Expand Down Expand Up @@ -281,7 +300,11 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
return false;
}
}
return writeRecord(hoodieRecord, indexedRecord, isDelete);
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
if (result && cdcEnabled) {
cdcLogger.put(hoodieRecord, oldRecord, indexedRecord);
}
return result;
}

protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
Expand All @@ -292,6 +315,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOExceptio
return;
}
if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
if (cdcEnabled) {
cdcLogger.put(hoodieRecord, null, insertRecord);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a sub-class here to avoid all these cdcEnabled flag switching ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually can't do that w/o requiring the insertRecord to be deserialized twice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean for deserialized twice, just overwride the writeRecord method and add the cdc logger logic should work here.

insertRecordsWritten++;
}
}
Expand Down Expand Up @@ -402,6 +428,8 @@ protected void writeIncomingRecords() throws IOException {
@Override
public List<WriteStatus> close() {
try {
HoodieWriteStat stat = writeStatus.getStat();

writeIncomingRecords();

if (keyToNewRecords instanceof ExternalSpillableMap) {
Expand All @@ -416,9 +444,12 @@ public List<WriteStatus> close() {
fileWriter = null;
}

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
HoodieWriteStat stat = writeStatus.getStat();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult =
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these to sub-class

HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);

long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setNumWrites(recordsWritten);
Expand Down
Loading