Skip to content

Commit 7413180

Browse files
danny0405Alexey Kudinkin
authored andcommitted
[HUDI-5067] Merge the columns stats of multiple log blocks from the same log file (apache#7018)
1 parent c40fc8d commit 7413180

6 files changed

Lines changed: 275 additions & 2 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record
362362
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
363363
collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath());
364364

365-
stat.setRecordsStats(columnRangesMetadataMap);
365+
stat.putRecordsStats(columnRangesMetadataMap);
366366
}
367367

368368
resetWriteCounts();

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
6666
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
6767
import org.apache.hudi.util.FlinkClientUtil;
68+
import org.apache.hudi.util.WriteStatMerger;
6869

6970
import com.codahale.metrics.Timer;
7071
import org.apache.hadoop.conf.Configuration;
@@ -122,7 +123,13 @@ protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
122123
@Override
123124
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
124125
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
125-
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
126+
// for eager flush, multiple write stat may share one file path.
127+
List<HoodieWriteStat> merged = writeStats.stream()
128+
.collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
129+
.values().stream()
130+
.map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
131+
.collect(Collectors.toList());
132+
return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds);
126133
}
127134

128135
@Override
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.util;
20+
21+
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
22+
import org.apache.hudi.common.model.HoodieWriteStat;
23+
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
/**
30+
* Helper clazz to merge hoodie write stats that belong to one file path.
31+
*
32+
* <p>CAUTION: The merge can be buggy, we need to maintain the new variables for the write stat.
33+
*/
34+
public class WriteStatMerger {
35+
public static HoodieWriteStat merge(HoodieWriteStat stat1, HoodieWriteStat stat2) {
36+
if (stat1 instanceof HoodieDeltaWriteStat) {
37+
return mergeDeltaWriteStat((HoodieDeltaWriteStat) stat1, (HoodieDeltaWriteStat) stat2);
38+
}
39+
return mergeWriteStat(new HoodieWriteStat(), stat1, stat2);
40+
}
41+
42+
private static HoodieDeltaWriteStat mergeDeltaWriteStat(
43+
HoodieDeltaWriteStat stat1,
44+
HoodieDeltaWriteStat stat2) {
45+
HoodieDeltaWriteStat merged = new HoodieDeltaWriteStat();
46+
mergeWriteStat(merged, stat1, stat2);
47+
merged.setLogVersion(stat2.getLogVersion());
48+
merged.setLogOffset(stat2.getLogOffset());
49+
merged.setBaseFile(stat2.getBaseFile());
50+
// log files
51+
List<String> mergedLogFiles = new ArrayList<>(stat1.getLogFiles());
52+
for (String logFile : stat2.getLogFiles()) {
53+
if (!mergedLogFiles.contains(logFile)) {
54+
mergedLogFiles.add(logFile);
55+
}
56+
}
57+
merged.setLogFiles(mergedLogFiles);
58+
// column stats
59+
if (stat1.getColumnStats().isPresent()) {
60+
merged.putRecordsStats(stat1.getColumnStats().get());
61+
}
62+
if (stat2.getColumnStats().isPresent()) {
63+
merged.putRecordsStats(stat2.getColumnStats().get());
64+
}
65+
return merged;
66+
}
67+
68+
private static HoodieWriteStat mergeWriteStat(HoodieWriteStat merged, HoodieWriteStat stat1, HoodieWriteStat stat2) {
69+
merged.setFileId(stat2.getFileId());
70+
merged.setPath(stat2.getPath());
71+
// merge cdc stats
72+
merged.setCdcStats(getMergedCdcStats(stat1.getCdcStats(), stat2.getCdcStats()));
73+
// prev commit
74+
merged.setPrevCommit(stat2.getPrevCommit());
75+
76+
merged.setNumWrites(stat2.getNumWrites() + stat1.getNumWrites());
77+
merged.setNumDeletes(stat2.getNumDeletes() + stat1.getNumDeletes());
78+
merged.setNumUpdateWrites(stat2.getNumUpdateWrites() + stat1.getNumUpdateWrites());
79+
merged.setNumInserts(stat2.getNumInserts() + stat1.getNumInserts());
80+
merged.setTotalWriteBytes(stat2.getTotalWriteBytes() + stat1.getTotalWriteBytes());
81+
merged.setTotalWriteErrors(stat2.getTotalWriteErrors() + stat1.getTotalWriteErrors());
82+
83+
// -------------------------------------------------------------------------
84+
// Nullable
85+
// -------------------------------------------------------------------------
86+
87+
// tmp path
88+
merged.setTempPath(stat2.getTempPath());
89+
// partition path
90+
merged.setPartitionPath(stat2.getPartitionPath());
91+
// runtime stats
92+
merged.setRuntimeStats(getMergedRuntimeStats(stat1.getRuntimeStats(), stat2.getRuntimeStats()));
93+
94+
// log statistics
95+
merged.setTotalLogRecords(stat2.getTotalLogRecords() + stat1.getTotalLogRecords());
96+
merged.setTotalLogFilesCompacted(stat2.getTotalLogFilesCompacted() + stat1.getTotalLogFilesCompacted());
97+
merged.setTotalLogSizeCompacted(stat2.getTotalLogSizeCompacted() + stat1.getTotalLogSizeCompacted());
98+
merged.setTotalUpdatedRecordsCompacted(stat2.getTotalUpdatedRecordsCompacted() + stat1.getTotalUpdatedRecordsCompacted());
99+
merged.setTotalLogBlocks(stat2.getTotalLogBlocks() + stat1.getTotalLogBlocks());
100+
merged.setTotalCorruptLogBlock(stat2.getTotalCorruptLogBlock() + stat1.getTotalCorruptLogBlock());
101+
merged.setTotalRollbackBlocks(stat2.getTotalRollbackBlocks() + stat1.getTotalRollbackBlocks());
102+
merged.setFileSizeInBytes(stat2.getFileSizeInBytes() + stat1.getFileSizeInBytes());
103+
// event time
104+
merged.setMinEventTime(minLong(stat1.getMinEventTime(), stat2.getMinEventTime()));
105+
merged.setMaxEventTime(maxLong(stat1.getMaxEventTime(), stat2.getMaxEventTime()));
106+
return stat2;
107+
}
108+
109+
private static HoodieWriteStat.RuntimeStats getMergedRuntimeStats(
110+
HoodieWriteStat.RuntimeStats runtimeStats1,
111+
HoodieWriteStat.RuntimeStats runtimeStats2) {
112+
final HoodieWriteStat.RuntimeStats runtimeStats;
113+
if (runtimeStats1 != null && runtimeStats2 != null) {
114+
runtimeStats = new HoodieWriteStat.RuntimeStats();
115+
runtimeStats.setTotalScanTime(runtimeStats1.getTotalScanTime() + runtimeStats2.getTotalScanTime());
116+
runtimeStats.setTotalUpsertTime(runtimeStats1.getTotalUpsertTime() + runtimeStats2.getTotalUpsertTime());
117+
runtimeStats.setTotalCreateTime(runtimeStats1.getTotalCreateTime() + runtimeStats2.getTotalCreateTime());
118+
} else if (runtimeStats1 == null) {
119+
runtimeStats = runtimeStats2;
120+
} else {
121+
runtimeStats = runtimeStats1;
122+
}
123+
return runtimeStats;
124+
}
125+
126+
private static Map<String, Long> getMergedCdcStats(Map<String, Long> cdcStats1, Map<String, Long> cdcStats2) {
127+
final Map<String, Long> cdcStats;
128+
if (cdcStats1 != null && cdcStats2 != null) {
129+
cdcStats = new HashMap<>();
130+
cdcStats.putAll(cdcStats1);
131+
cdcStats.putAll(cdcStats2);
132+
} else if (cdcStats1 == null) {
133+
cdcStats = cdcStats2;
134+
} else {
135+
cdcStats = cdcStats1;
136+
}
137+
return cdcStats;
138+
}
139+
140+
private static Long minLong(Long v1, Long v2) {
141+
if (v1 == null) {
142+
return v2;
143+
}
144+
if (v2 == null) {
145+
return v1;
146+
}
147+
return v1.compareTo(v2) < 0 ? v1 : v2;
148+
}
149+
150+
private static Long maxLong(Long v1, Long v2) {
151+
if (v1 == null) {
152+
return v2;
153+
}
154+
if (v2 == null) {
155+
return v1;
156+
}
157+
return v1.compareTo(v2) > 0 ? v1 : v2;
158+
}
159+
}

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,41 @@ public static HoodieColumnRangeMetadata<Comparable> stub(String filePath,
148148
String columnName) {
149149
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
150150
}
151+
152+
/**
153+
* Merges the given two column range metadata.
154+
*/
155+
public static HoodieColumnRangeMetadata<Comparable> merge(
156+
HoodieColumnRangeMetadata<Comparable> left,
157+
HoodieColumnRangeMetadata<Comparable> right) {
158+
String filePath = left.getFilePath();
159+
String columnName = left.getColumnName();
160+
Comparable min = minVal(left.getMinValue(), right.getMinValue());
161+
Comparable max = maxVal(left.getMaxValue(), right.getMaxValue());
162+
long nullCount = left.getNullCount() + right.getNullCount();
163+
long valueCount = left.getValueCount() + right.getValueCount();
164+
long totalSize = left.getTotalSize() + right.getTotalSize();
165+
long totalUncompressedSize = left.getTotalUncompressedSize() + right.getTotalUncompressedSize();
166+
return create(filePath, columnName, min, max, nullCount, valueCount, totalSize, totalUncompressedSize);
167+
}
168+
169+
private static Comparable minVal(Comparable val1, Comparable val2) {
170+
if (val1 == null) {
171+
return val2;
172+
}
173+
if (val2 == null) {
174+
return val1;
175+
}
176+
return val1.compareTo(val2) < 0 ? val1 : val2;
177+
}
178+
179+
private static Comparable maxVal(Comparable val1, Comparable val2) {
180+
if (val1 == null) {
181+
return val2;
182+
}
183+
if (val2 == null) {
184+
return val1;
185+
}
186+
return val1.compareTo(val2) > 0 ? val1 : val2;
187+
}
151188
}

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hudi.common.util.Option;
2323

2424
import java.util.ArrayList;
25+
import java.util.HashMap;
2526
import java.util.List;
2627
import java.util.Map;
2728

@@ -74,11 +75,35 @@ public List<String> getLogFiles() {
7475
return logFiles;
7576
}
7677

78+
public void putRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
79+
if (!recordsStats.isPresent()) {
80+
recordsStats = Option.of(stats);
81+
} else {
82+
// in case there are multiple log blocks for one write process.
83+
recordsStats = Option.of(mergeRecordsStats(recordsStats.get(), stats));
84+
}
85+
}
86+
87+
// keep for serialization efficiency
7788
public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
7889
recordsStats = Option.of(stats);
7990
}
8091

8192
public Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> getColumnStats() {
8293
return recordsStats;
8394
}
95+
96+
private static Map<String, HoodieColumnRangeMetadata<Comparable>> mergeRecordsStats(
97+
Map<String, HoodieColumnRangeMetadata<Comparable>> stats1,
98+
Map<String, HoodieColumnRangeMetadata<Comparable>> stats2) {
99+
Map<String, HoodieColumnRangeMetadata<Comparable>> mergedStats = new HashMap<>(stats1);
100+
for (Map.Entry<String, HoodieColumnRangeMetadata<Comparable>> entry : stats2.entrySet()) {
101+
final String colName = entry.getKey();
102+
final HoodieColumnRangeMetadata<Comparable> metadata = mergedStats.containsKey(colName)
103+
? HoodieColumnRangeMetadata.merge(mergedStats.get(colName), entry.getValue())
104+
: entry.getValue();
105+
mergedStats.put(colName, metadata);
106+
}
107+
return mergedStats;
108+
}
84109
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,6 +1373,51 @@ void testWriteAndReadWithDataSkipping(HoodieTableType tableType) {
13731373
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
13741374
}
13751375

1376+
@Test
1377+
void testMultipleLogBlocksWithDataSkipping() {
1378+
TableEnvironment tableEnv = batchTableEnv;
1379+
String hoodieTableDDL = sql("t1")
1380+
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
1381+
.option(FlinkOptions.METADATA_ENABLED, true)
1382+
.option("hoodie.metadata.index.column.stats.enable", true)
1383+
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
1384+
.option("hoodie.metadata.index.column.stats.column.list", "ts")
1385+
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
1386+
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
1387+
.option("hoodie.logfile.data.block.max.size", 1)
1388+
.end();
1389+
tableEnv.executeSql(hoodieTableDDL);
1390+
1391+
execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);
1392+
1393+
// apply filters
1394+
List<Row> result2 = CollectionUtil.iterableToList(
1395+
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
1396+
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
1397+
}
1398+
1399+
@Test
1400+
void testEagerFlushWithDataSkipping() {
1401+
TableEnvironment tableEnv = batchTableEnv;
1402+
String hoodieTableDDL = sql("t1")
1403+
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
1404+
.option(FlinkOptions.METADATA_ENABLED, true)
1405+
.option("hoodie.metadata.index.column.stats.enable", true)
1406+
.option("hoodie.metadata.index.column.stats.file.group.count", 2)
1407+
.option("hoodie.metadata.index.column.stats.column.list", "ts")
1408+
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
1409+
.option(FlinkOptions.WRITE_BATCH_SIZE, 0.00001)
1410+
.end();
1411+
tableEnv.executeSql(hoodieTableDDL);
1412+
1413+
execInsertSql(tableEnv, TestSQL.INSERT_SAME_KEY_T1);
1414+
1415+
// apply filters
1416+
List<Row> result2 = CollectionUtil.iterableToList(
1417+
() -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:04'").execute().collect());
1418+
assertRowsEquals(result2, "[+I[id1, Danny, 23, 1970-01-01T00:00:05, par1]]");
1419+
}
1420+
13761421
@Test
13771422
void testBuiltinFunctionWithHMSCatalog() {
13781423
TableEnvironment tableEnv = batchTableEnv;

0 commit comments

Comments
 (0)