1818 */
1919package org .apache .hadoop .hbase .replication .regionserver ;
2020
21+ import com .google .common .annotations .VisibleForTesting ;
2122import com .google .common .collect .Lists ;
2223import com .google .common .util .concurrent .ListenableFuture ;
2324import com .google .common .util .concurrent .Service ;
@@ -436,14 +437,30 @@ public String getPeerClusterId() {
436437 }
437438
438439 @ Override
440+ @ VisibleForTesting
439441 public Path getCurrentPath () {
440- // only for testing
441442 for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
442443 if (worker .getCurrentPath () != null ) return worker .getCurrentPath ();
443444 }
444445 return null ;
445446 }
446447
448+ @ VisibleForTesting
449+ public Path getLastLoggedPath () {
450+ for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
451+ return worker .getLastLoggedPath ();
452+ }
453+ return null ;
454+ }
455+
456+ @ VisibleForTesting
457+ public long getLastLoggedPosition () {
458+ for (ReplicationSourceShipperThread worker : workerThreads .values ()) {
459+ return worker .getLastLoggedPosition ();
460+ }
461+ return 0 ;
462+ }
463+
447464 private boolean isSourceActive () {
448465 return !this .stopper .isStopped () && this .sourceRunning ;
449466 }
@@ -478,8 +495,8 @@ public String getStats() {
478495 for (Map .Entry <String , ReplicationSourceShipperThread > entry : workerThreads .entrySet ()) {
479496 String walGroupId = entry .getKey ();
480497 ReplicationSourceShipperThread worker = entry .getValue ();
481- long position = worker .getCurrentPosition ();
482- Path currentPath = worker .getCurrentPath ();
498+ long position = worker .getLastLoggedPosition ();
499+ Path currentPath = worker .getLastLoggedPath ();
483500 sb .append ("walGroup [" ).append (walGroupId ).append ("]: " );
484501 if (currentPath != null ) {
485502 sb .append ("currently replicating from: " ).append (currentPath ).append (" at position: " )
@@ -513,7 +530,7 @@ public class ReplicationSourceShipperThread extends Thread {
513530 // Last position in the log that we sent to ZooKeeper
514531 private long lastLoggedPosition = -1 ;
515532 // Path of the current log
516- private volatile Path currentPath ;
533+ private volatile Path lastLoggedPath ;
517534 // Current state of the worker thread
518535 private WorkerState state ;
519536 ReplicationSourceWALReaderThread entryReader ;
@@ -553,21 +570,19 @@ public void run() {
553570 try {
554571 WALEntryBatch entryBatch = entryReader .take ();
555572 shipEdits (entryBatch );
556- if (replicationQueueInfo .isQueueRecovered () && entryBatch .getWalEntries ().isEmpty ()
557- && entryBatch .getLastSeqIds ().isEmpty ()) {
558- LOG .debug ("Finished recovering queue for group " + walGroupId + " of peer "
559- + peerClusterZnode );
573+ if (!entryBatch .hasMoreEntries ()) {
574+ LOG .debug ("Finished recovering queue for group "
575+ + walGroupId + " of peer " + peerClusterZnode );
560576 metrics .incrCompletedRecoveryQueue ();
561577 setWorkerState (WorkerState .FINISHED );
562- continue ;
563578 }
564579 } catch (InterruptedException e ) {
565580 LOG .trace ("Interrupted while waiting for next replication entry batch" , e );
566581 Thread .currentThread ().interrupt ();
567582 }
568583 }
569584
570- if (replicationQueueInfo . isQueueRecovered () && getWorkerState () == WorkerState .FINISHED ) {
585+ if (getWorkerState () == WorkerState .FINISHED ) {
571586 // use synchronize to make sure one last thread will clean the queue
572587 synchronized (this ) {
573588 Threads .sleep (100 );// wait a short while for other worker thread to fully exit
@@ -635,15 +650,13 @@ private void checkBandwidthChangeAndResetThrottler() {
635650 protected void shipEdits (WALEntryBatch entryBatch ) {
636651 List <Entry > entries = entryBatch .getWalEntries ();
637652 long lastReadPosition = entryBatch .getLastWalPosition ();
638- currentPath = entryBatch .getLastWalPath ();
653+ lastLoggedPath = entryBatch .getLastWalPath ();
639654 int sleepMultiplier = 0 ;
640655 if (entries .isEmpty ()) {
641- if (lastLoggedPosition != lastReadPosition ) {
642- updateLogPosition (lastReadPosition );
643- // if there was nothing to ship and it's not an error
644- // set "ageOfLastShippedOp" to <now> to indicate that we're current
645- metrics .setAgeOfLastShippedOp (EnvironmentEdgeManager .currentTime (), walGroupId );
646- }
656+ updateLogPosition (lastReadPosition );
657+ // if there was nothing to ship and it's not an error
658+ // set "ageOfLastShippedOp" to <now> to indicate that we're current
659+ metrics .setAgeOfLastShippedOp (EnvironmentEdgeManager .currentTime (), walGroupId );
647660 return ;
648661 }
649662 int currentSize = (int ) entryBatch .getHeapSize ();
@@ -727,8 +740,7 @@ protected void shipEdits(WALEntryBatch entryBatch) {
727740 }
728741
729742 private void updateLogPosition (long lastReadPosition ) {
730- manager .setPendingShipment (false );
731- manager .logPositionAndCleanOldLogs (currentPath , peerClusterZnode , lastReadPosition ,
743+ manager .logPositionAndCleanOldLogs (lastLoggedPath , peerClusterZnode , lastReadPosition ,
732744 this .replicationQueueInfo .isQueueRecovered (), false );
733745 lastLoggedPosition = lastReadPosition ;
734746 }
@@ -740,7 +752,7 @@ public void startup() {
740752 public void uncaughtException (final Thread t , final Throwable e ) {
741753 RSRpcServices .exitIfOOME (e );
742754 LOG .error ("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
743- + getCurrentPath (), e );
755+ + getLastLoggedPath (), e );
744756 stopper .stop ("Unexpected exception in ReplicationSourceWorkerThread" );
745757 }
746758 };
@@ -881,8 +893,12 @@ public Path getCurrentPath() {
881893 return this .entryReader .getCurrentPath ();
882894 }
883895
884- public long getCurrentPosition () {
885- return this .lastLoggedPosition ;
896+ public Path getLastLoggedPath () {
897+ return lastLoggedPath ;
898+ }
899+
900+ public long getLastLoggedPosition () {
901+ return lastLoggedPosition ;
886902 }
887903
888904 private boolean isWorkerActive () {
0 commit comments