Skip to content

Commit d5d2db7

Browse files
committed
addressing comments
1 parent b5554b6 commit d5d2db7

2 files changed

Lines changed: 79 additions & 42 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
3838
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
39+
import org.apache.hadoop.util.Timer;
3940
import org.slf4j.Logger;
4041
import org.slf4j.LoggerFactory;
4142
import org.apache.hadoop.classification.InterfaceAudience;
@@ -55,12 +56,10 @@
5556
import org.apache.hadoop.ipc.RPC;
5657
import org.apache.hadoop.security.SecurityUtil;
5758

58-
import static org.apache.hadoop.util.Time.monotonicNow;
5959
import static org.apache.hadoop.util.ExitUtil.terminate;
6060

6161
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
6262
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
63-
import org.apache.hadoop.util.Time;
6463

6564

6665
/**
@@ -172,14 +171,21 @@ public class EditLogTailer {
172171
*/
173172
private final long maxTxnsPerLock;
174173

174+
/**
175+
* Timer instance to be set only using constructor.
176+
* Only tests can reassign this by using setTimerForTests().
177+
* For source code, this timer instance should be treated as final.
178+
*/
179+
private Timer timer;
180+
175181
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
176182
this.tailerThread = new EditLogTailerThread();
177183
this.conf = conf;
178184
this.namesystem = namesystem;
185+
this.timer = new Timer();
179186
this.editLog = namesystem.getEditLog();
180-
181-
lastLoadTimeMs = monotonicNow();
182-
resetLastRollTimeMs();
187+
this.lastLoadTimeMs = timer.monotonicNow();
188+
this.lastRollTimeMs = timer.monotonicNow();
183189

184190
logRollPeriodMs = conf.getTimeDuration(
185191
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
@@ -301,7 +307,7 @@ public Void run() throws Exception {
301307
long editsTailed = 0;
302308
// Fully tail the journal to the end
303309
do {
304-
long startTime = Time.monotonicNow();
310+
long startTime = timer.monotonicNow();
305311
try {
306312
NameNode.getNameNodeMetrics().addEditLogTailInterval(
307313
startTime - lastLoadTimeMs);
@@ -312,7 +318,7 @@ public Void run() throws Exception {
312318
throw new IOException(e);
313319
} finally {
314320
NameNode.getNameNodeMetrics().addEditLogTailTime(
315-
Time.monotonicNow() - startTime);
321+
timer.monotonicNow() - startTime);
316322
}
317323
} while(editsTailed > 0);
318324
return null;
@@ -336,7 +342,7 @@ public long doTailEdits() throws IOException, InterruptedException {
336342
LOG.debug("lastTxnId: " + lastTxnId);
337343
}
338344
Collection<EditLogInputStream> streams;
339-
long startTime = Time.monotonicNow();
345+
long startTime = timer.monotonicNow();
340346
try {
341347
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
342348
null, inProgressOk, true);
@@ -349,7 +355,7 @@ public long doTailEdits() throws IOException, InterruptedException {
349355
return 0;
350356
} finally {
351357
NameNode.getNameNodeMetrics().addEditLogFetchTime(
352-
Time.monotonicNow() - startTime);
358+
timer.monotonicNow() - startTime);
353359
}
354360
if (LOG.isDebugEnabled()) {
355361
LOG.debug("edit streams to load from: " + streams.size());
@@ -374,7 +380,7 @@ public long doTailEdits() throws IOException, InterruptedException {
374380
}
375381

376382
if (editsLoaded > 0) {
377-
lastLoadTimeMs = monotonicNow();
383+
lastLoadTimeMs = timer.monotonicNow();
378384
}
379385
lastLoadedTxnId = image.getLastAppliedTxId();
380386
return editsLoaded;
@@ -395,7 +401,7 @@ public long getLastLoadTimeMs() {
395401
*/
396402
private boolean tooLongSinceLastLoad() {
397403
return logRollPeriodMs >= 0 &&
398-
(monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
404+
(timer.monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
399405
}
400406

401407
/**
@@ -423,20 +429,38 @@ void triggerActiveLogRoll() {
423429
try {
424430
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
425431
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
426-
resetLastRollTimeMs();
432+
this.lastRollTimeMs = timer.monotonicNow();
427433
lastRollTriggerTxId = lastLoadedTxnId;
428434
} catch (ExecutionException | InterruptedException e) {
429435
LOG.warn("Unable to trigger a roll of the active NN", e);
430436
} catch (TimeoutException e) {
431-
future.cancel(true);
437+
if (future != null) {
438+
future.cancel(true);
439+
}
432440
LOG.warn(String.format(
433441
"Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
434442
}
435443
}
436444

445+
/**
446+
* This is only to be used by tests. For source code, the only way to
447+
* set timer is by using EditLogTailer constructor.
448+
*
449+
* @param newTimer Timer instance provided by tests.
450+
*/
451+
@VisibleForTesting
452+
void setTimerForTest(final Timer newTimer) {
453+
this.timer = newTimer;
454+
}
455+
456+
/**
457+
* Used by tests. Return Timer instance used by EditLogTailer.
458+
*
459+
* @return Return Timer instance used by EditLogTailer.
460+
*/
437461
@VisibleForTesting
438-
public void resetLastRollTimeMs() {
439-
this.lastRollTimeMs = monotonicNow();
462+
Timer getTimer() {
463+
return timer;
440464
}
441465

442466
@VisibleForTesting
@@ -498,15 +522,15 @@ private void doWork() {
498522
// name system lock will be acquired to further block even the block
499523
// state updates.
500524
namesystem.cpLockInterruptibly();
501-
long startTime = Time.monotonicNow();
525+
long startTime = timer.monotonicNow();
502526
try {
503527
NameNode.getNameNodeMetrics().addEditLogTailInterval(
504528
startTime - lastLoadTimeMs);
505529
editsTailed = doTailEdits();
506530
} finally {
507531
namesystem.cpUnlock();
508532
NameNode.getNameNodeMetrics().addEditLogTailTime(
509-
Time.monotonicNow() - startTime);
533+
timer.monotonicNow() - startTime);
510534
}
511535
//Update NameDirSize Metric
512536
if (triggeredLogRoll) {

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hadoop.hdfs.server.namenode.NameNode;
5757
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
5858
import org.apache.hadoop.test.GenericTestUtils;
59+
import org.apache.hadoop.util.Timer;
5960
import org.slf4j.event.Level;
6061
import org.junit.Test;
6162
import org.junit.runner.RunWith;
@@ -397,17 +398,11 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
397398

398399
// Time in seconds to wait before checking if edit logs are rolled while
399400
// expecting edit log roll.
400-
// Aggressive tests like this one are resource sensitive and bound to
401-
// fail with slow system, hence waiting to confirm
402-
// curSegmentTxId for 7 seconds makes it more reliable in case
403-
// EditLogTailer#doWork() takes few more milliseconds to retrieve result
404-
// from EditLogTailer#triggerActiveLogRoll() and lastRollTimeMs is not
405-
// updated frequently.
406-
final int logRollWaitTime = 7;
401+
final int logRollWaitTime = 3;
407402

403+
final int logRollPeriod = standbyCatchupWaitTime + noLogRollWaitTime + 1;
408404
Configuration conf = getConf();
409-
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
410-
standbyCatchupWaitTime + noLogRollWaitTime + 2);
405+
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriod);
411406
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
412407
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
413408

@@ -436,33 +431,32 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
436431
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
437432
standbyCatchupWaitTime);
438433

434+
long curTime = standby.getNamesystem().getEditLogTailer().getTimer()
435+
.monotonicNow();
436+
long inSufficientTimeForLogRoll =
437+
TimeUnit.SECONDS.toMillis(logRollPeriod / 3);
438+
TestTimer testTimer = new TestTimer(curTime + inSufficientTimeForLogRoll);
439+
standby.getNamesystem().getEditLogTailer().setTimerForTest(testTimer);
440+
439441
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
440442
NameNodeAdapter.mkdirs(active, getDirPath(i),
441443
new PermissionStatus("test", "test",
442444
new FsPermission((short)00755)), true);
443-
// reset lastRollTimeMs in EditLogTailer.
444-
active.getNamesystem().getEditLogTailer().resetLastRollTimeMs();
445445
}
446446

447-
// We should explicitly update lastRollTimeMs in EditLogTailer
448-
// so that our timeout test provided just below can take advantage
449-
// of validation: (monotonicNow() - lastRollTimeMs) > logRollPeriodMs
450-
// provided in EditLogTailer#tooLongSinceLastLoad().
451-
active.getNamesystem().getEditLogTailer().resetLastRollTimeMs();
452-
453447
try {
454-
// Aggressive checks like this one are resource sensitive and bound to
455-
// fail with slow system, hence waiting to confirm
456-
// curSegmentTxId for (noLogRollWaitTime - 1) seconds
457-
// makes it more reliable in case EditLogTailer#doWork() takes few
458-
// more milliseconds to retrieve result from
459-
// EditLogTailer#triggerActiveLogRoll().
460-
checkForLogRoll(active, origTxId, noLogRollWaitTime - 1);
448+
checkForLogRoll(active, origTxId, noLogRollWaitTime);
461449
fail("Expected to timeout");
462450
} catch (TimeoutException e) {
463451
// expected
464452
}
465453

454+
long curTimeNew = standby.getNamesystem().getEditLogTailer().getTimer()
455+
.monotonicNow();
456+
long sufficientTimeForLogRoll =
457+
TimeUnit.SECONDS.toMillis(logRollPeriod * 3);
458+
testTimer.setTime(curTimeNew + sufficientTimeForLogRoll);
459+
466460
checkForLogRoll(active, origTxId, logRollWaitTime);
467461
} finally {
468462
cluster.shutdown();
@@ -485,7 +479,7 @@ private static void checkForLogRoll(final NameNode active,
485479
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
486480
.getCurSegmentTxId();
487481
return (origTxId != curSegmentTxId);
488-
}, 500, TimeUnit.SECONDS.toMillis(maxWaitSec));
482+
}, 100, TimeUnit.SECONDS.toMillis(maxWaitSec));
489483
}
490484

491485
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
@@ -502,4 +496,23 @@ private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
502496
.build();
503497
return cluster;
504498
}
499+
500+
private static final class TestTimer extends Timer {
501+
502+
private volatile long time;
503+
504+
private TestTimer(long time) {
505+
this.time = time;
506+
}
507+
508+
private void setTime(long newTime) {
509+
this.time = newTime;
510+
}
511+
512+
@Override
513+
public long monotonicNow() {
514+
return time;
515+
}
516+
}
517+
505518
}

0 commit comments

Comments
 (0)