Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand All @@ -33,6 +32,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -49,10 +49,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
Expand All @@ -64,6 +66,10 @@ public class HoodieCDCLogger implements Closeable {

private final String keyField;

private final String partitionPath;

private final FileSystem fs;

private final Schema dataSchema;

// writer for cdc data
Expand All @@ -73,35 +79,56 @@ public class HoodieCDCLogger implements Closeable {

private final Schema cdcSchema;

private final String cdcSchemaString;

// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;

private final Map<HoodieLogBlock.HeaderMetadataType, String> cdcDataBlockHeader;

// the cdc record transformer
private final CDCTransformer transformer;

// Max block size to limit to for a log block
private final int maxBlockSize;

// Average cdc record size. This size is updated at the end of every log block flushed to disk
private long averageCDCRecordSize = 0;

// Number of records that must be written to meet the max block size for a log block
private AtomicInteger numOfCDCRecordsInMemory = new AtomicInteger();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numOfCDCRecordInMemory -> numOfCDCRecordsInMemory

private final SizeEstimator<HoodieAvroPayload> sizeEstimator;

private final List<Path> cdcAbsPaths;

public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
HoodieTableConfig tableConfig,
String partitionPath,
FileSystem fs,
Schema schema,
HoodieLogFormat.Writer cdcWriter,
long maxInMemorySizeInBytes) {
try {
this.commitTime = commitTime;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.partitionPath = partitionPath;
this.fs = fs;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.cdcWriter = cdcWriter;
this.cdcSupplementalLoggingMode = tableConfig.cdcSupplementalLoggingMode();
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
);
this.cdcSchemaString = this.cdcSchema.toString();

this.cdcDataBlockHeader = new HashMap<>();
this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());

this.sizeEstimator = new DefaultSizeEstimator<>();
this.cdcData = new ExternalSpillableMap<>(
maxInMemorySizeInBytes,
config.getSpillableMapBasePath(),
Expand All @@ -110,6 +137,9 @@ public HoodieCDCLogger(
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
this.transformer = getTransformer();
this.maxBlockSize = config.getLogFileDataBlockMaxSize();

this.cdcAbsPaths = new ArrayList<>();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
}
Expand All @@ -136,49 +166,70 @@ public void put(HoodieRecord hoodieRecord,
cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
}
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));

flushIfNeeded(false);
HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(cdcRecord));
if (cdcData.isEmpty()) {
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
}
cdcData.put(recordKey, payload);
numOfCDCRecordsInMemory.incrementAndGet();
}

public Option<AppendResult> writeCDCData() {
if (isEmpty()) {
return Option.empty();
private void flushIfNeeded(Boolean force) {
if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) {
try {
List<IndexedRecord> records = cdcData.values().stream()
.map(record -> {
try {
return record.getInsertValue(cdcSchema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

HoodieLogBlock block = new HoodieCDCDataBlock(records, cdcDataBlockHeader, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

Path cdcAbsPath = result.logFile().getPath();
if (!cdcAbsPaths.contains(cdcAbsPath)) {
cdcAbsPaths.add(cdcAbsPath);
}

// reset stat
cdcData.clear();
numOfCDCRecordsInMemory = new AtomicInteger();
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
}
}
}

public Map<String, Long> getCDCWriteStats() {
Map<String, Long> stats = new HashMap<>();
try {
List<IndexedRecord> records = cdcData.values().stream()
.map(record -> {
try {
return record.getInsertValue(cdcSchema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString);

HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

// call close to trigger the data flush.
this.close();

return Option.of(result);
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
for (Path cdcAbsPath : cdcAbsPaths) {
String cdcFileName = cdcAbsPath.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath));
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get cdc write stat", e);
}
return stats;
}

@Override
public void close() {
try {
flushIfNeeded(true);
if (cdcWriter != null) {
cdcWriter.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
} finally {
// in case that crash when call `flushIfNeeded`, do the cleanup again.
cdcData.clear();
}
}
Expand All @@ -204,40 +255,6 @@ private GenericRecord removeCommitMetadata(GenericRecord record) {
return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap());
}

public boolean isEmpty() {
return this.cdcData.isEmpty();
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the incoming data is INSERT.
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
FileSystem fs) {
try {
if (cdcResult.isPresent()) {
Path cdcLogFile = cdcResult.get().logFile().getPath();
String cdcFileName = cdcLogFile.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile);
stat.setCdcPath(cdcPath);
stat.setCdcWriteBytes(cdcFileSizeInBytes);
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to set cdc write stat", e);
}
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand All @@ -52,6 +52,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -68,6 +70,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -93,9 +97,17 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult =
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);

if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if condition is not suitable for Flink, we may need some change for flink cdc handles.

// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the incoming data is INSERT.
return writeStatuses;
}

cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -54,6 +54,8 @@ public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
getFileSystem(),
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -78,9 +80,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -57,6 +57,8 @@ public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTim
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
getFileSystem(),
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -81,9 +83,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,13 @@ public static Integer getTaskAttemptIdFromLogPath(Path path) {
* Get the last part of the file name in the log file and convert to int.
*/
public static int getFileVersionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
return getFileVersionFromLog(logPath.getName());
}

public static int getFileVersionFromLog(String logFileName) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName);
if (!matcher.find()) {
throw new InvalidHoodiePathException(logPath, "LogFile");
throw new HoodieIOException("Invalid log file name: " + logFileName);
}
return Integer.parseInt(matcher.group(4));
}
Expand Down
Loading