Skip to content

Commit da337b4

Browse files
danny0405Alexey Kudinkin
authored andcommitted
Revert "[HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (apache#6796)" (apache#7090)
This reverts commit e222693.
1 parent d411a51 commit da337b4

File tree

3 files changed

+14
-88
lines changed

3 files changed

+14
-88
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import java.util.Objects;
6464
import java.util.concurrent.CompletableFuture;
6565
import java.util.concurrent.CompletionException;
66-
import java.util.concurrent.atomic.AtomicInteger;
6766
import java.util.stream.Collectors;
6867

6968
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -153,12 +152,6 @@ public class StreamWriteOperatorCoordinator
153152
*/
154153
private CkpMetadata ckpMetadata;
155154

156-
/**
157-
* Counter for the failed tasks, a number within the range (0, task_num) means
158-
* a partial failover.
159-
*/
160-
private transient AtomicInteger failedCnt;
161-
162155
/**
163156
* Constructs a StreamingSinkOperatorCoordinator.
164157
*
@@ -301,17 +294,6 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
301294
// reset the event
302295
this.eventBuffer[i] = null;
303296
LOG.warn("Reset the event for task [" + i + "]", throwable);
304-
305-
// based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
306-
// when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
307-
// then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).
308-
309-
// reset the ckp metadata for either partial or complete failover
310-
if (this.failedCnt.get() == 0) {
311-
this.ckpMetadata.reset();
312-
}
313-
// inc the failed tasks counter
314-
this.failedCnt.incrementAndGet();
315297
}
316298

317299
@Override
@@ -365,14 +347,6 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr
365347

366348
private void reset() {
367349
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
368-
this.failedCnt = new AtomicInteger(0);
369-
}
370-
371-
/**
372-
* Checks whether it is a PARTIAL failover.
373-
*/
374-
private boolean isPartialFailover() {
375-
return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
376350
}
377351

378352
/**
@@ -436,16 +410,6 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
436410
if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
437411
// start to initialize the instant.
438412
initInstant(event.getInstantTime());
439-
} else if (isPartialFailover()) {
440-
// if the bootstrap event comes from a partial failover,
441-
// decrement the failed tasks by one.
442-
443-
// if all the failed task bootstrap events are received, send a start instant
444-
// to the ckp metadata and unblock the data flushing.
445-
if (this.failedCnt.decrementAndGet() <= 0) {
446-
this.ckpMetadata.startInstant(this.instant);
447-
this.failedCnt.set(0);
448-
}
449413
}
450414
}
451415

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java

Lines changed: 14 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.io.Serializable;
3838
import java.util.ArrayList;
3939
import java.util.Arrays;
40-
import java.util.Iterator;
4140
import java.util.List;
4241
import java.util.stream.Collectors;
4342

@@ -62,7 +61,7 @@ public class CkpMetadata implements Serializable {
6261

6362
private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
6463

65-
private static final int MAX_RETAIN_CKP_NUM = 3;
64+
protected static final int MAX_RETAIN_CKP_NUM = 3;
6665

6766
// the ckp metadata directory
6867
private static final String CKP_META = "ckp_meta";
@@ -100,65 +99,37 @@ public void bootstrap() throws IOException {
10099
fs.mkdirs(path);
101100
}
102101

103-
/**
104-
* Resets the message bus, would clean all the messages.
105-
*
106-
* <p>This expects to be called by the driver.
107-
*/
108-
public void reset() {
109-
Iterator<String> itr = this.instantCache.iterator();
110-
while (itr.hasNext()) {
111-
cleanInstant(itr.next(), true);
112-
itr.remove();
113-
}
114-
}
115-
116102
public void startInstant(String instant) {
117103
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
118104
try {
119105
fs.createNewFile(path);
120106
} catch (IOException e) {
121107
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
122108
}
123-
// cache the instant
124-
cache(instant);
125109
// cleaning
126-
clean();
110+
clean(instant);
127111
}
128112

129-
private void cache(String newInstant) {
113+
private void clean(String newInstant) {
130114
if (this.instantCache == null) {
131115
this.instantCache = new ArrayList<>();
132116
}
133117
this.instantCache.add(newInstant);
134-
}
135-
136-
private void clean() {
137118
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
138-
boolean success = cleanInstant(instantCache.get(0), false);
139-
if (success) {
140-
instantCache.remove(0);
141-
}
142-
}
143-
}
144-
145-
private boolean cleanInstant(String instant, boolean throwsT) {
146-
boolean success = true;
147-
for (String fileName : CkpMessage.getAllFileNames(instant)) {
148-
Path path = fullPath(fileName);
149-
try {
150-
fs.delete(path, false);
151-
} catch (IOException ex) {
152-
success = false;
153-
final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
154-
if (throwsT) {
155-
throw new HoodieException(errMsg, ex);
156-
} else {
157-
LOG.warn(errMsg, ex);
119+
final String instant = instantCache.get(0);
120+
boolean[] error = new boolean[1];
121+
CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
122+
try {
123+
fs.delete(path, false);
124+
} catch (IOException e) {
125+
error[0] = true;
126+
LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
158127
}
128+
});
129+
if (!error[0]) {
130+
instantCache.remove(0);
159131
}
160132
}
161-
return success;
162133
}
163134

164135
/**

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.hudi.configuration.HadoopConfigurations;
3030
import org.apache.hudi.metadata.HoodieTableMetadata;
3131
import org.apache.hudi.sink.event.WriteMetadataEvent;
32-
import org.apache.hudi.sink.meta.CkpMetadata;
3332
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
3433
import org.apache.hudi.sink.utils.NonThrownExecutor;
3534
import org.apache.hudi.util.StreamerUtil;
@@ -165,14 +164,6 @@ public void testCheckpointCompleteWithPartialEvents() {
165164
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
166165
}
167166

168-
@Test
169-
public void testSubTaskFailed() {
170-
coordinator.subtaskFailed(0, null);
171-
assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
172-
CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
173-
assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
174-
}
175-
176167
@Test
177168
public void testHiveSyncInvoked() throws Exception {
178169
// reset

0 commit comments

Comments
 (0)