Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -304,21 +305,28 @@ public static void updateColumnStatsIndexFor(
if (validIndexTables.isEmpty()) {
finalColStatsIndexDf = newColStatsIndexDf;
} else {
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
// prior to merging, since merging might fail otherwise due to schemas incompatibility
finalColStatsIndexDf =
tryMergeMostRecentIndexTableInto(
sparkSession,
newColStatsIndexDf,
// Load current most recent col-stats-index table
sparkSession.read().load(
new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
)
);

// Clean up all index tables (after creation of the new index)
tablesToCleanup.addAll(validIndexTables);
Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1));

Option<Dataset<Row>> existingIndexTableOpt =
tryLoadExistingIndexTable(sparkSession, latestIndexTablePath);

if (!existingIndexTableOpt.isPresent()) {
finalColStatsIndexDf = newColStatsIndexDf;
} else {
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
// prior to merging, since merging might fail otherwise due to schemas incompatibility
finalColStatsIndexDf =
tryMergeMostRecentIndexTableInto(
sparkSession,
newColStatsIndexDf,
// Load current most recent col-stats-index table
existingIndexTableOpt.get()
);

// Clean up all index tables (after creation of the new index)
tablesToCleanup.addAll(validIndexTables);
}
}

// Persist new col-stats-index table
Expand Down Expand Up @@ -349,6 +357,17 @@ public static void updateColumnStatsIndexFor(
}
}

@Nonnull
private static Option<Dataset<Row>> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) {
try {
Dataset<Row> indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString());
return Option.of(indexTableDataset);
} catch (Exception e) {
LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e);
return Option.empty();
}
}

@Nonnull
private static Dataset<Row> tryMergeMostRecentIndexTableInto(
@Nonnull SparkSession sparkSession,
Expand Down