From 2fd1d5ab5b34cdf0b5f9042e38efccd6b8091a60 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Fri, 21 Oct 2022 16:47:49 +0800 Subject: [PATCH] [HUDI-5067] Merge the columns stats of multiple log blocks from the same log file --- .../apache/hudi/io/HoodieAppendHandle.java | 2 +- .../hudi/client/HoodieFlinkWriteClient.java | 9 +- .../org/apache/hudi/util/WriteStatMerger.java | 159 ++++++++++++++++++ .../model/HoodieColumnRangeMetadata.java | 37 ++++ .../common/model/HoodieDeltaWriteStat.java | 25 +++ .../hudi/table/ITTestHoodieDataSource.java | 45 +++++ 6 files changed, 275 insertions(+), 2 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/WriteStatMerger.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 8db927d569b82..d1d5d31e41076 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -395,7 +395,7 @@ private void processAppendResult(AppendResult result, List record Map> columnRangesMetadataMap = collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath()); - stat.setRecordsStats(columnRangesMetadataMap); + stat.putRecordsStats(columnRangesMetadataMap); } resetWriteCounts(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 191eb003b905e..e6f853f692281 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -58,6 +58,7 @@ import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; +import org.apache.hudi.util.WriteStatMerger; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; @@ -115,7 +116,13 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) { @Override public boolean commit(String instantTime, List writeStatuses, Option> extraMetadata, String commitActionType, Map> partitionToReplacedFileIds) { List writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList()); - return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); + // for eager flush, multiple write stat may share one file path. + List merged = writeStats.stream() + .collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath())) + .values().stream() + .map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get()) + .collect(Collectors.toList()); + return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/WriteStatMerger.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/WriteStatMerger.java new file mode 100644 index 0000000000000..ded4d124ab5c2 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/WriteStatMerger.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.model.HoodieDeltaWriteStat; +import org.apache.hudi.common.model.HoodieWriteStat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Helper clazz to merge hoodie write stats that belong to one file path. + * + *

CAUTION: The merge can be buggy, we need to maintain the new variables for the write stat. + */ +public class WriteStatMerger { + public static HoodieWriteStat merge(HoodieWriteStat stat1, HoodieWriteStat stat2) { + if (stat1 instanceof HoodieDeltaWriteStat) { + return mergeDeltaWriteStat((HoodieDeltaWriteStat) stat1, (HoodieDeltaWriteStat) stat2); + } + return mergeWriteStat(new HoodieWriteStat(), stat1, stat2); + } + + private static HoodieDeltaWriteStat mergeDeltaWriteStat( + HoodieDeltaWriteStat stat1, + HoodieDeltaWriteStat stat2) { + HoodieDeltaWriteStat merged = new HoodieDeltaWriteStat(); + mergeWriteStat(merged, stat1, stat2); + merged.setLogVersion(stat2.getLogVersion()); + merged.setLogOffset(stat2.getLogOffset()); + merged.setBaseFile(stat2.getBaseFile()); + // log files + List mergedLogFiles = new ArrayList<>(stat1.getLogFiles()); + for (String logFile : stat2.getLogFiles()) { + if (!mergedLogFiles.contains(logFile)) { + mergedLogFiles.add(logFile); + } + } + merged.setLogFiles(mergedLogFiles); + // column stats + if (stat1.getColumnStats().isPresent()) { + merged.putRecordsStats(stat1.getColumnStats().get()); + } + if (stat2.getColumnStats().isPresent()) { + merged.putRecordsStats(stat2.getColumnStats().get()); + } + return merged; + } + + private static HoodieWriteStat mergeWriteStat(HoodieWriteStat merged, HoodieWriteStat stat1, HoodieWriteStat stat2) { + merged.setFileId(stat2.getFileId()); + merged.setPath(stat2.getPath()); + // merge cdc stats + merged.setCdcStats(getMergedCdcStats(stat1.getCdcStats(), stat2.getCdcStats())); + // prev commit + merged.setPrevCommit(stat2.getPrevCommit()); + + merged.setNumWrites(stat2.getNumWrites() + stat1.getNumWrites()); + merged.setNumDeletes(stat2.getNumDeletes() + stat1.getNumDeletes()); + merged.setNumUpdateWrites(stat2.getNumUpdateWrites() + stat1.getNumUpdateWrites()); + merged.setNumInserts(stat2.getNumInserts() + stat1.getNumInserts()); + merged.setTotalWriteBytes(stat2.getTotalWriteBytes() + stat1.getTotalWriteBytes()); + merged.setTotalWriteErrors(stat2.getTotalWriteErrors() + stat1.getTotalWriteErrors()); + + // ------------------------------------------------------------------------- + // Nullable + // ------------------------------------------------------------------------- + + // tmp path + merged.setTempPath(stat2.getTempPath()); + // partition path + merged.setPartitionPath(stat2.getPartitionPath()); + // runtime stats + merged.setRuntimeStats(getMergedRuntimeStats(stat1.getRuntimeStats(), stat2.getRuntimeStats())); + + // log statistics + merged.setTotalLogRecords(stat2.getTotalLogRecords() + stat1.getTotalLogRecords()); + merged.setTotalLogFilesCompacted(stat2.getTotalLogFilesCompacted() + stat1.getTotalLogFilesCompacted()); + merged.setTotalLogSizeCompacted(stat2.getTotalLogSizeCompacted() + stat1.getTotalLogSizeCompacted()); + merged.setTotalUpdatedRecordsCompacted(stat2.getTotalUpdatedRecordsCompacted() + stat1.getTotalUpdatedRecordsCompacted()); + merged.setTotalLogBlocks(stat2.getTotalLogBlocks() + stat1.getTotalLogBlocks()); + merged.setTotalCorruptLogBlock(stat2.getTotalCorruptLogBlock() + stat1.getTotalCorruptLogBlock()); + merged.setTotalRollbackBlocks(stat2.getTotalRollbackBlocks() + stat1.getTotalRollbackBlocks()); + merged.setFileSizeInBytes(stat2.getFileSizeInBytes() + stat1.getFileSizeInBytes()); + // event time + merged.setMinEventTime(minLong(stat1.getMinEventTime(), stat2.getMinEventTime())); + merged.setMaxEventTime(maxLong(stat1.getMaxEventTime(), stat2.getMaxEventTime())); + return stat2; + } + + private static HoodieWriteStat.RuntimeStats getMergedRuntimeStats( + HoodieWriteStat.RuntimeStats runtimeStats1, + HoodieWriteStat.RuntimeStats runtimeStats2) { + final HoodieWriteStat.RuntimeStats runtimeStats; + if (runtimeStats1 != null && runtimeStats2 != null) { + runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalScanTime(runtimeStats1.getTotalScanTime() + runtimeStats2.getTotalScanTime()); + runtimeStats.setTotalUpsertTime(runtimeStats1.getTotalUpsertTime() + runtimeStats2.getTotalUpsertTime()); + runtimeStats.setTotalCreateTime(runtimeStats1.getTotalCreateTime() + runtimeStats2.getTotalCreateTime()); + } else if (runtimeStats1 == null) { + runtimeStats = runtimeStats2; + } else { + runtimeStats = runtimeStats1; + } + return runtimeStats; + } + + private static Map getMergedCdcStats(Map cdcStats1, Map cdcStats2) { + final Map cdcStats; + if (cdcStats1 != null && cdcStats2 != null) { + cdcStats = new HashMap<>(); + cdcStats.putAll(cdcStats1); + cdcStats.putAll(cdcStats2); + } else if (cdcStats1 == null) { + cdcStats = cdcStats2; + } else { + cdcStats = cdcStats1; + } + return cdcStats; + } + + private static Long minLong(Long v1, Long v2) { + if (v1 == null) { + return v2; + } + if (v2 == null) { + return v1; + } + return v1.compareTo(v2) < 0 ? v1 : v2; + } + + private static Long maxLong(Long v1, Long v2) { + if (v1 == null) { + return v2; + } + if (v2 == null) { + return v1; + } + return v1.compareTo(v2) > 0 ? v1 : v2; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index e3c5a70d5cf16..1974544e63278 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -148,4 +148,41 @@ public static HoodieColumnRangeMetadata stub(String filePath, String columnName) { return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1); } + + /** + * Merges the given two column range metadata. + */ + public static HoodieColumnRangeMetadata merge( + HoodieColumnRangeMetadata left, + HoodieColumnRangeMetadata right) { + String filePath = left.getFilePath(); + String columnName = left.getColumnName(); + Comparable min = minVal(left.getMinValue(), right.getMinValue()); + Comparable max = maxVal(left.getMaxValue(), right.getMaxValue()); + long nullCount = left.getNullCount() + right.getNullCount(); + long valueCount = left.getValueCount() + right.getValueCount(); + long totalSize = left.getTotalSize() + right.getTotalSize(); + long totalUncompressedSize = left.getTotalUncompressedSize() + right.getTotalUncompressedSize(); + return create(filePath, columnName, min, max, nullCount, valueCount, totalSize, totalUncompressedSize); + } + + private static Comparable minVal(Comparable val1, Comparable val2) { + if (val1 == null) { + return val2; + } + if (val2 == null) { + return val1; + } + return val1.compareTo(val2) < 0 ? val1 : val2; + } + + private static Comparable maxVal(Comparable val1, Comparable val2) { + if (val1 == null) { + return val2; + } + if (val2 == null) { + return val1; + } + return val1.compareTo(val2) > 0 ? val1 : val2; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java index 9626e218a2247..b3ae5b8ceced4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.util.Option; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +75,16 @@ public List getLogFiles() { return logFiles; } + public void putRecordsStats(Map> stats) { + if (!recordsStats.isPresent()) { + recordsStats = Option.of(stats); + } else { + // in case there are multiple log blocks for one write process. + recordsStats = Option.of(mergeRecordsStats(recordsStats.get(), stats)); + } + } + + // keep for serialization efficiency public void setRecordsStats(Map> stats) { recordsStats = Option.of(stats); } @@ -81,4 +92,18 @@ public void setRecordsStats(Map> s public Option>> getColumnStats() { return recordsStats; } + + private static Map> mergeRecordsStats( + Map> stats1, + Map> stats2) { + Map> mergedStats = new HashMap<>(stats1); + for (Map.Entry> entry : stats2.entrySet()) { + final String colName = entry.getKey(); + final HoodieColumnRangeMetadata metadata = mergedStats.containsKey(colName) + ? HoodieColumnRangeMetadata.merge(mergedStats.get(colName), entry.getValue()) + : entry.getValue(); + mergedStats.put(colName, metadata); + } + return mergedStats; + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 538b78d9881ae..d01963ba5e193 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1408,6 +1408,51 @@ void testWriteAndReadWithDataSkipping(HoodieTableType tableType) { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @Test + void testMultipleLogBlocksWithDataSkipping() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.METADATA_ENABLED, true) + .option("hoodie.metadata.index.column.stats.enable", true) + .option("hoodie.metadata.index.column.stats.file.group.count", 2) + .option("hoodie.metadata.index.column.stats.column.list", "ts") + .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) + .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ) + .option("hoodie.logfile.data.block.max.size", 1) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); + + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect()); + assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]"); + } + + @Test + void testEagerFlushWithDataSkipping() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.METADATA_ENABLED, true) + .option("hoodie.metadata.index.column.stats.enable", true) + .option("hoodie.metadata.index.column.stats.file.group.count", 2) + .option("hoodie.metadata.index.column.stats.column.list", "ts") + .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) + .option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1); + + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect()); + assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]"); + } + @Test void testBuiltinFunctionWithHMSCatalog() { TableEnvironment tableEnv = batchTableEnv;