-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HDFS-16143. Add Timer in EditLogTailer and de-flake TestEditLogTailer#testStandbyTriggersLogRollsWhenTailInProgressEdits #3235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b5554b6
d5d2db7
f21efea
3720dec
c18b3d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -179,7 +179,7 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) { | |
| this.editLog = namesystem.getEditLog(); | ||
|
|
||
| lastLoadTimeMs = monotonicNow(); | ||
| lastRollTimeMs = monotonicNow(); | ||
| resetLastRollTimeMs(); | ||
|
|
||
| logRollPeriodMs = conf.getTimeDuration( | ||
| DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, | ||
|
|
@@ -423,21 +423,22 @@ void triggerActiveLogRoll() { | |
| try { | ||
| future = rollEditsRpcExecutor.submit(getNameNodeProxy()); | ||
| future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); | ||
| lastRollTimeMs = monotonicNow(); | ||
| resetLastRollTimeMs(); | ||
| lastRollTriggerTxId = lastLoadedTxnId; | ||
| } catch (ExecutionException e) { | ||
| } catch (ExecutionException | InterruptedException e) { | ||
| LOG.warn("Unable to trigger a roll of the active NN", e); | ||
| } catch (TimeoutException e) { | ||
| if (future != null) { | ||
| future.cancel(true); | ||
| } | ||
| future.cancel(true); | ||
| LOG.warn(String.format( | ||
| "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); | ||
| } catch (InterruptedException e) { | ||
| LOG.warn("Unable to trigger a roll of the active NN", e); | ||
| } | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| public void resetLastRollTimeMs() { | ||
|
||
| this.lastRollTimeMs = monotonicNow(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| void sleep(long sleepTimeMillis) throws InterruptedException { | ||
| Thread.sleep(sleepTimeMillis); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -394,13 +394,20 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() | |
| // Time in seconds to wait before checking if edit logs are rolled while | ||
| // expecting no edit log roll | ||
| final int noLogRollWaitTime = 2; | ||
|
|
||
| // Time in seconds to wait before checking if edit logs are rolled while | ||
| // expecting edit log roll | ||
| final int logRollWaitTime = 3; | ||
| // expecting edit log roll. | ||
| // Aggressive tests like this one are resource sensitive and bound to | ||
| // fail with slow system, hence waiting to confirm | ||
| // curSegmentTxId for 7 seconds makes it more reliable in case | ||
| // EditLogTailer#doWork() takes few more milliseconds to retrieve result | ||
| // from EditLogTailer#triggerActiveLogRoll() and lastRollTimeMs is not | ||
| // updated frequently. | ||
| final int logRollWaitTime = 7; | ||
|
|
||
| Configuration conf = getConf(); | ||
| conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, | ||
| standbyCatchupWaitTime + noLogRollWaitTime + 1); | ||
| standbyCatchupWaitTime + noLogRollWaitTime + 2); | ||
| conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); | ||
| conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); | ||
|
|
||
|
|
@@ -433,15 +440,28 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() | |
| NameNodeAdapter.mkdirs(active, getDirPath(i), | ||
| new PermissionStatus("test", "test", | ||
| new FsPermission((short)00755)), true); | ||
| // reset lastRollTimeMs in EditLogTailer. | ||
| active.getNamesystem().getEditLogTailer().resetLastRollTimeMs(); | ||
|
||
| } | ||
|
|
||
| boolean exceptionThrown = false; | ||
| // We should explicitly update lastRollTimeMs in EditLogTailer | ||
| // so that our timeout test provided just below can take advantage | ||
| // of validation: (monotonicNow() - lastRollTimeMs) > logRollPeriodMs | ||
| // provided in EditLogTailer#tooLongSinceLastLoad(). | ||
| active.getNamesystem().getEditLogTailer().resetLastRollTimeMs(); | ||
|
||
|
|
||
| try { | ||
| checkForLogRoll(active, origTxId, noLogRollWaitTime); | ||
| // Aggressive checks like this one are resource sensitive and bound to | ||
| // fail with slow system, hence waiting to confirm | ||
| // curSegmentTxId for (noLogRollWaitTime - 1) seconds | ||
| // makes it more reliable in case EditLogTailer#doWork() takes few | ||
| // more milliseconds to retrieve result from | ||
| // EditLogTailer#triggerActiveLogRoll(). | ||
| checkForLogRoll(active, origTxId, noLogRollWaitTime - 1); | ||
| fail("Expected to timeout"); | ||
| } catch (TimeoutException e) { | ||
| exceptionThrown = true; | ||
| // expected | ||
| } | ||
| assertTrue(exceptionThrown); | ||
|
|
||
| checkForLogRoll(active, origTxId, logRollWaitTime); | ||
| } finally { | ||
|
|
@@ -452,26 +472,20 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() | |
| private static void waitForStandbyToCatchUpWithInProgressEdits( | ||
| final NameNode standby, final long activeTxId, | ||
| int maxWaitSec) throws Exception { | ||
| GenericTestUtils.waitFor(new Supplier<Boolean>() { | ||
| @Override | ||
| public Boolean get() { | ||
| long standbyTxId = standby.getNamesystem().getFSImage() | ||
| .getLastAppliedTxId(); | ||
| return (standbyTxId >= activeTxId); | ||
| } | ||
| }, 100, maxWaitSec * 1000); | ||
| GenericTestUtils.waitFor(() -> { | ||
| long standbyTxId = standby.getNamesystem().getFSImage() | ||
| .getLastAppliedTxId(); | ||
| return (standbyTxId >= activeTxId); | ||
| }, 100, TimeUnit.SECONDS.toMillis(maxWaitSec)); | ||
| } | ||
|
|
||
| private static void checkForLogRoll(final NameNode active, | ||
| final long origTxId, int maxWaitSec) throws Exception { | ||
| GenericTestUtils.waitFor(new Supplier<Boolean>() { | ||
| @Override | ||
| public Boolean get() { | ||
| long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog() | ||
| .getCurSegmentTxId(); | ||
| return (origTxId != curSegmentTxId); | ||
| } | ||
| }, 100, maxWaitSec * 1000); | ||
| GenericTestUtils.waitFor(() -> { | ||
| long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog() | ||
| .getCurSegmentTxId(); | ||
| return (origTxId != curSegmentTxId); | ||
| }, 500, TimeUnit.SECONDS.toMillis(maxWaitSec)); | ||
|
||
| } | ||
|
|
||
| private static MiniDFSCluster createMiniDFSCluster(Configuration conf, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the null-check removed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because future will never be null here. The only way we can reach here is by catching
TimeoutExceptionandTimeoutExceptioncan only occur here because offuture.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS), hence we don't need null-check.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Both
ExecutionExceptionandInterruptedExceptionwill also only be thrown byFuture#get, so I think the try-catch should probably look like:This will make it more clear why
futurewill not be null when we get into thecatchblock.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But also this is unrelated to this JIRA, can you put up a separate JIRA/PR for it? Particular since this JIRA is just aimed at fixing test flakiness, it's better to minimize any production code changes.