diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 521bdb20c58fc..4fdb6a6be3cba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -304,21 +305,28 @@ public static void updateColumnStatsIndexFor( if (validIndexTables.isEmpty()) { finalColStatsIndexDf = newColStatsIndexDf; } else { - // NOTE: That Parquet schema might deviate from the original table schema (for ex, - // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it - // prior to merging, since merging might fail otherwise due to schemas incompatibility - finalColStatsIndexDf = - tryMergeMostRecentIndexTableInto( - sparkSession, - newColStatsIndexDf, - // Load current most recent col-stats-index table - sparkSession.read().load( - new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() - ) - ); - - // Clean up all index tables (after creation of the new index) - tablesToCleanup.addAll(validIndexTables); + Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)); + + Option> existingIndexTableOpt = + tryLoadExistingIndexTable(sparkSession, latestIndexTablePath); + + if (!existingIndexTableOpt.isPresent()) { + finalColStatsIndexDf = newColStatsIndexDf; + } else { + // NOTE: That Parquet schema might deviate from the original table schema (for ex, + // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it + // prior to merging, since merging might fail otherwise due to schemas incompatibility + finalColStatsIndexDf = + tryMergeMostRecentIndexTableInto( + sparkSession, + newColStatsIndexDf, + // Load current most recent col-stats-index table + existingIndexTableOpt.get() + ); + + // Clean up all index tables (after creation of the new index) + tablesToCleanup.addAll(validIndexTables); + } } // Persist new col-stats-index table @@ -349,6 +357,17 @@ public static void updateColumnStatsIndexFor( } } + @Nonnull + private static Option> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) { + try { + Dataset indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString()); + return Option.of(indexTableDataset); + } catch (Exception e) { + LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e); + return Option.empty(); + } + } + @Nonnull private static Dataset tryMergeMostRecentIndexTableInto( @Nonnull SparkSession sparkSession,