Skip to content

Commit 43e0819

Browse files
authored
[HUDI-4098] Metadata table heartbeat for instant has expired, last heartbeat 0 (#5583)
1 parent 61030d8 commit 43e0819

2 files changed

Lines changed: 50 additions & 0 deletions

File tree

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
138138
// reuses the same instant time without rollback first. It is a no-op here as the
139139
// clean plan is the same, so we don't need to delete the requested and inflight instant
140140
// files in the active timeline.
141+
142+
// The metadata writer uses LAZY cleaning strategy without auto commit,
143+
// write client then checks the heartbeat expiration when committing the instant,
144+
// sets up the heartbeat explicitly to make the check pass.
145+
writeClient.getHeartbeatClient().start(instantTime);
141146
}
142147

143148
List<WriteStatus> statuses = preppedRecordList.size() > 0

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.hudi.common.fs.FSUtils;
2323
import org.apache.hudi.common.model.HoodieWriteStat;
2424
import org.apache.hudi.common.table.HoodieTableMetaClient;
25+
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
26+
import org.apache.hudi.common.table.timeline.HoodieInstant;
2527
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2628
import org.apache.hudi.configuration.FlinkOptions;
2729
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -253,6 +255,49 @@ void testSyncMetadataTable() throws Exception {
253255
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
254256
}
255257

258+
@Test
259+
void testSyncMetadataTableWithReusedInstant() throws Exception {
260+
// reset
261+
reset();
262+
// override the default configuration
263+
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
264+
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
265+
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
266+
coordinator = new StreamWriteOperatorCoordinator(conf, context);
267+
coordinator.start();
268+
coordinator.setExecutor(new MockCoordinatorExecutor(context));
269+
270+
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
271+
272+
coordinator.handleEventFromOperator(0, event0);
273+
274+
String instant = coordinator.getInstant();
275+
assertNotEquals("", instant);
276+
277+
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
278+
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
279+
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
280+
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L));
281+
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
282+
283+
// writes a normal commit
284+
mockWriteWithMetadata();
285+
instant = coordinator.getInstant();
286+
// creates an inflight commit on the metadata timeline
287+
metadataTableMetaClient.getActiveTimeline()
288+
.createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant));
289+
metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant);
290+
metadataTableMetaClient.reloadActiveTimeline();
291+
292+
// write another commit with existing instant on the metadata timeline
293+
instant = mockWriteWithMetadata();
294+
metadataTableMetaClient.reloadActiveTimeline();
295+
296+
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
297+
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(3L));
298+
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
299+
}
300+
256301
// -------------------------------------------------------------------------
257302
// Utilities
258303
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)