Skip to content

Commit 676d5ce

Browse files
authored
[HUDI-4138] Fix the concurrency modification of hoodie table config for flink (#5660)
* Remove the metadata cleaning strategy for flink, that means the multi-modal index may be affected * Improve the HoodieTable#clearMetadataTablePartitionsConfig to only update table config when necessary * Remove the modification of read code path in HoodieTableConfig
1 parent af1128a commit 676d5ce

4 files changed

Lines changed: 13 additions & 28 deletions

File tree

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -885,24 +885,22 @@ private boolean shouldExecuteMetadataTableDeletion() {
885885
// partitions are ready to use
886886
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
887887
&& !config.isMetadataTableEnabled()
888-
&& (!metaClient.getTableConfig().contains(TABLE_METADATA_PARTITIONS)
889-
|| !metaClient.getTableConfig().getMetadataPartitions().isEmpty());
888+
&& !metaClient.getTableConfig().getMetadataPartitions().isEmpty();
890889
}
891890

892891
/**
893892
* Clears hoodie.table.metadata.partitions in hoodie.properties
894893
*/
895894
private void clearMetadataTablePartitionsConfig(Option<MetadataPartitionType> partitionType, boolean clearAll) {
896-
if (clearAll) {
895+
Set<String> partitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
896+
if (clearAll && partitions.size() > 0) {
897897
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
898898
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), EMPTY_STRING);
899899
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
900-
return;
900+
} else if (partitions.remove(partitionType.get().getPartitionPath())) {
901+
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", partitions));
902+
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
901903
}
902-
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
903-
completedPartitions.remove(partitionType.get().getPartitionPath());
904-
metaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
905-
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
906904
}
907905

908906
public HoodieTableMetadata getMetadataTable() {

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.apache.hudi.io.MiniBatchHandle;
5454
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
5555
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
56-
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
5756
import org.apache.hudi.table.BulkInsertPartitioner;
5857
import org.apache.hudi.table.HoodieFlinkTable;
5958
import org.apache.hudi.table.HoodieTable;
@@ -365,8 +364,7 @@ public void completeCompaction(
365364
// commit to data table after committing to metadata table.
366365
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
367366
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
368-
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
369-
w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
367+
writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
370368
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
371369
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
372370
} finally {

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,9 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
105105
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
106106
Option<T> actionMetadata) {
107107
if (config.isMetadataTableEnabled()) {
108-
// even with metadata enabled, some index could have been disabled
109-
// delete metadata partitions corresponding to such indexes
110-
deleteMetadataIndexIfNecessary();
111108
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
112109
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
113110
} else {
114-
maybeDeleteMetadataTable();
115111
return Option.empty();
116112
}
117113
}

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,8 @@ private static Properties getOrderedPropertiesWithTableChecksum(Properties props
272272
* @throws IOException
273273
*/
274274
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
275-
String checksum;
276-
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
275+
final String checksum;
276+
if (isValidChecksum(props)) {
277277
checksum = props.getProperty(TABLE_CHECKSUM.key());
278278
props.store(outputStream, "Updated at " + Instant.now());
279279
} else {
@@ -285,8 +285,8 @@ private static String storeProperties(Properties props, FSDataOutputStream outpu
285285
return checksum;
286286
}
287287

288-
private boolean isValidChecksum() {
289-
return contains(TABLE_CHECKSUM) && validateChecksum(props);
288+
private static boolean isValidChecksum(Properties props) {
289+
return props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props);
290290
}
291291

292292
/**
@@ -298,20 +298,13 @@ public HoodieTableConfig() {
298298

299299
private void fetchConfigs(FileSystem fs, String metaPath) throws IOException {
300300
Path cfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
301-
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
302301
try (FSDataInputStream is = fs.open(cfgPath)) {
303302
props.load(is);
304-
// validate checksum for latest table version
305-
if (getTableVersion().versionCode() >= HoodieTableVersion.FOUR.versionCode() && !isValidChecksum()) {
306-
LOG.warn("Checksum validation failed. Falling back to backed up configs.");
307-
try (FSDataInputStream fsDataInputStream = fs.open(backupCfgPath)) {
308-
props.load(fsDataInputStream);
309-
}
310-
}
311303
} catch (IOException ioe) {
312304
if (!fs.exists(cfgPath)) {
313305
LOG.warn("Run `table recover-configs` if config update/delete failed midway. Falling back to backed up configs.");
314306
// try the backup. this way no query ever fails if update fails midway.
307+
Path backupCfgPath = new Path(metaPath, HOODIE_PROPERTIES_FILE_BACKUP);
315308
try (FSDataInputStream is = fs.open(backupCfgPath)) {
316309
props.load(is);
317310
}
@@ -631,7 +624,7 @@ public List<String> getMetadataPartitions() {
631624
CONFIG_VALUES_DELIMITER
632625
);
633626
}
634-
627+
635628
/**
636629
* Returns the format to use for partition meta files.
637630
*/

0 commit comments

Comments
 (0)