Skip to content

Commit 5d59bf6

Browse files
author
Alexey Kudinkin
authored
[HUDI-3513] Make sure Column Stats does not fail in case it fails to load previous Index Table state (#5015)
1 parent 56cb494 commit 5d59bf6

1 file changed

Lines changed: 34 additions & 15 deletions

File tree

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
2525
import org.apache.hudi.common.model.HoodieFileFormat;
2626
import org.apache.hudi.common.util.BaseFileUtils;
27+
import org.apache.hudi.common.util.Option;
2728
import org.apache.hudi.common.util.ParquetUtils;
2829
import org.apache.hudi.common.util.collection.Pair;
2930
import org.apache.hudi.exception.HoodieException;
@@ -304,21 +305,28 @@ public static void updateColumnStatsIndexFor(
304305
if (validIndexTables.isEmpty()) {
305306
finalColStatsIndexDf = newColStatsIndexDf;
306307
} else {
307-
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
308-
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
309-
// prior to merging, since merging might fail otherwise due to schemas incompatibility
310-
finalColStatsIndexDf =
311-
tryMergeMostRecentIndexTableInto(
312-
sparkSession,
313-
newColStatsIndexDf,
314-
// Load current most recent col-stats-index table
315-
sparkSession.read().load(
316-
new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
317-
)
318-
);
319-
320-
// Clean up all index tables (after creation of the new index)
321-
tablesToCleanup.addAll(validIndexTables);
308+
Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1));
309+
310+
Option<Dataset<Row>> existingIndexTableOpt =
311+
tryLoadExistingIndexTable(sparkSession, latestIndexTablePath);
312+
313+
if (!existingIndexTableOpt.isPresent()) {
314+
finalColStatsIndexDf = newColStatsIndexDf;
315+
} else {
316+
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
317+
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
318+
// prior to merging, since merging might fail otherwise due to schemas incompatibility
319+
finalColStatsIndexDf =
320+
tryMergeMostRecentIndexTableInto(
321+
sparkSession,
322+
newColStatsIndexDf,
323+
// Load current most recent col-stats-index table
324+
existingIndexTableOpt.get()
325+
);
326+
327+
// Clean up all index tables (after creation of the new index)
328+
tablesToCleanup.addAll(validIndexTables);
329+
}
322330
}
323331

324332
// Persist new col-stats-index table
@@ -349,6 +357,17 @@ public static void updateColumnStatsIndexFor(
349357
}
350358
}
351359

360+
@Nonnull
361+
private static Option<Dataset<Row>> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) {
362+
try {
363+
Dataset<Row> indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString());
364+
return Option.of(indexTableDataset);
365+
} catch (Exception e) {
366+
LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e);
367+
return Option.empty();
368+
}
369+
}
370+
352371
@Nonnull
353372
private static Dataset<Row> tryMergeMostRecentIndexTableInto(
354373
@Nonnull SparkSession sparkSession,

0 commit comments

Comments
 (0)