2828import java .util .HashMap ;
2929import java .util .List ;
3030import java .util .Map ;
31+ import java .util .NavigableSet ;
3132import java .util .TreeMap ;
33+ import java .util .TreeSet ;
3234import java .util .UUID ;
3335import java .util .concurrent .ConcurrentHashMap ;
3436import java .util .concurrent .PriorityBlockingQueue ;
3537import java .util .concurrent .TimeUnit ;
3638import java .util .concurrent .TimeoutException ;
3739import java .util .concurrent .atomic .AtomicLong ;
3840import java .util .function .Predicate ;
41+ import java .util .stream .Collectors ;
3942
4043import org .apache .commons .lang3 .StringUtils ;
4144import org .apache .hadoop .conf .Configuration ;
5255import org .apache .hadoop .hbase .replication .ChainWALEntryFilter ;
5356import org .apache .hadoop .hbase .replication .ClusterMarkingEntryFilter ;
5457import org .apache .hadoop .hbase .replication .ReplicationEndpoint ;
58+ import org .apache .hadoop .hbase .replication .ReplicationException ;
5559import org .apache .hadoop .hbase .replication .ReplicationPeer ;
5660import org .apache .hadoop .hbase .replication .ReplicationQueueInfo ;
5761import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
62+ import org .apache .hadoop .hbase .replication .ReplicationUtils ;
5863import org .apache .hadoop .hbase .replication .SystemTableWALEntryFilter ;
5964import org .apache .hadoop .hbase .replication .WALEntryFilter ;
6065import org .apache .hadoop .hbase .util .Threads ;
6166import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
67+ import org .apache .hadoop .hbase .wal .SyncReplicationWALProvider ;
6268import org .apache .hadoop .hbase .wal .WAL .Entry ;
6369import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
6470import org .apache .yetus .audience .InterfaceAudience ;
71+ import org .apache .zookeeper .KeeperException ;
6572import org .slf4j .Logger ;
6673import org .slf4j .LoggerFactory ;
6774import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
@@ -130,8 +137,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
130137 protected final ConcurrentHashMap <String , ReplicationSourceShipper > workerThreads =
131138 new ConcurrentHashMap <>();
132139
133- private AtomicLong totalBufferUsed ;
134-
135140 public static final String WAIT_ON_ENDPOINT_SECONDS =
136141 "hbase.replication.wait.on.endpoint.seconds" ;
137142 public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30 ;
@@ -183,7 +188,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
183188 * @param metrics metrics for replication source
184189 */
185190 @ Override
186- public void init (Configuration conf , FileSystem fs , ReplicationSourceManager manager ,
191+ public void init (Configuration conf , FileSystem fs , Path walDir , ReplicationSourceManager manager ,
187192 ReplicationQueueStorage queueStorage , ReplicationPeer replicationPeer , Server server ,
188193 String queueId , UUID clusterId , WALFileLengthProvider walFileLengthProvider ,
189194 MetricsSource metrics ) throws IOException {
@@ -211,7 +216,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
211216 defaultBandwidth = this .conf .getLong ("replication.source.per.peer.node.bandwidth" , 0 );
212217 currentBandwidth = getCurrentBandwidth ();
213218 this .throttler = new ReplicationThrottler ((double ) currentBandwidth / 10.0 );
214- this .totalBufferUsed = manager .getTotalBufferUsed ();
215219 this .walFileLengthProvider = walFileLengthProvider ;
216220 LOG .info ("queueId={}, ReplicationSource: {}, currentBandwidth={}" , queueId ,
217221 replicationPeer .getId (), this .currentBandwidth );
@@ -398,10 +402,11 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId,
398402 }
399403
400404 private ReplicationSourceWALReader createNewWALReader (String walGroupId ,
401- PriorityBlockingQueue <Path > queue , long startPosition ) {
402- return replicationPeer .getPeerConfig ().isSerial ()
403- ? new SerialReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this )
404- : new ReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this );
405+ PriorityBlockingQueue <Path > queue , long startPosition ) {
406+ return replicationPeer .getPeerConfig ().isSerial () ?
407+ new SerialReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this ,
408+ manager ) :
409+ new ReplicationSourceWALReader (fs , conf , queue , startPosition , walEntryFilter , this , manager );
405410 }
406411
407412 /**
@@ -426,11 +431,6 @@ public ReplicationEndpoint getReplicationEndpoint() {
426431 return this .replicationEndpoint ;
427432 }
428433
429- @ Override
430- public ReplicationSourceManager getSourceManager () {
431- return this .manager ;
432- }
433-
434434 @ Override
435435 public void tryThrottle (int batchSize ) throws InterruptedException {
436436 checkBandwidthChangeAndResetThrottler ();
@@ -735,7 +735,7 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
735735 throttler .addPushSize (batchSize );
736736 }
737737 totalReplicatedEdits .addAndGet (entries .size ());
738- long newBufferUsed = totalBufferUsed .addAndGet (-batchSize );
738+ long newBufferUsed = manager . getTotalBufferUsed () .addAndGet (-batchSize );
739739 // Record the new buffer usage
740740 this .manager .getGlobalMetrics ().setWALReaderEditsBufferBytes (newBufferUsed );
741741 }
@@ -770,4 +770,137 @@ void removeWorker(ReplicationSourceShipper worker) {
770770 private String logPeerId (){
771771 return "[Source for peer " + this .getPeer ().getId () + "]:" ;
772772 }
773+
774+ @ VisibleForTesting
775+ public void setWALPosition (WALEntryBatch entryBatch ) {
776+ String fileName = entryBatch .getLastWalPath ().getName ();
777+ interruptOrAbortWhenFail (() -> this .queueStorage
778+ .setWALPosition (server .getServerName (), getQueueId (), fileName ,
779+ entryBatch .getLastWalPosition (), entryBatch .getLastSeqIds ()));
780+ }
781+
782+ @ VisibleForTesting
783+ public void cleanOldWALs (String log , boolean inclusive ) {
784+ NavigableSet <String > walsToRemove = getWalsToRemove (log , inclusive );
785+ if (walsToRemove .isEmpty ()) {
786+ return ;
787+ }
788+ // cleanOldWALs may spend some time, especially for sync replication where we may want to
789+ // remove remote wals as the remote cluster may have already been down, so we do it outside
790+ // the lock to avoid block preLogRoll
791+ cleanOldWALs (walsToRemove );
792+ }
793+
794+ private NavigableSet <String > getWalsToRemove (String log , boolean inclusive ) {
795+ NavigableSet <String > walsToRemove = new TreeSet <>();
796+ String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
797+ try {
798+ this .queueStorage .getWALsInQueue (this .server .getServerName (), getQueueId ()).forEach (wal -> {
799+ LOG .debug ("getWalsToRemove wal {}" , wal );
800+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
801+ if (walPrefix .equals (logPrefix )) {
802+ walsToRemove .add (wal );
803+ }
804+ });
805+ } catch (ReplicationException e ) {
806+ // Just log the exception here, as the recovered replication source will try to cleanup again.
807+ LOG .warn ("Failed to read wals in queue {}" , getQueueId (), e );
808+ }
809+ return walsToRemove .headSet (log , inclusive );
810+ }
811+
812+ private void removeRemoteWALs (String peerId , String remoteWALDir , Collection <String > wals )
813+ throws IOException {
814+ Path remoteWALDirForPeer = ReplicationUtils .getPeerRemoteWALDir (remoteWALDir , peerId );
815+ FileSystem fs = ReplicationUtils .getRemoteWALFileSystem (conf , remoteWALDir );
816+ for (String wal : wals ) {
817+ Path walFile = new Path (remoteWALDirForPeer , wal );
818+ try {
819+ if (!fs .delete (walFile , false ) && fs .exists (walFile )) {
820+ throw new IOException ("Can not delete " + walFile );
821+ }
822+ } catch (FileNotFoundException e ) {
823+ // Just ignore since this means the file has already been deleted.
824+ // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
825+ // inexistent file, so here we deal with both, i.e, check the return value of the
826+ // FileSystem.delete, and also catch FNFE.
827+ LOG .debug ("The remote wal {} has already been deleted?" , walFile , e );
828+ }
829+ }
830+ }
831+
832+ private void cleanOldWALs (NavigableSet <String > wals ) {
833+ LOG .debug ("Removing {} logs in the list: {}" , wals .size (), wals );
834+ // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
835+ // failover time if you want to transit the remote cluster from S to A. And the infinite retry
836+ // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
837+ // not contact with the HBase cluster either, so the replication will be blocked either.
838+ if (isSyncReplication ()) {
839+ String peerId = getPeerId ();
840+ String remoteWALDir = replicationPeer .getPeerConfig ().getRemoteWALDir ();
841+ // Filter out the wals need to be removed from the remote directory. Its name should be the
842+ // special format, and also, the peer id in its name should match the peer id for the
843+ // replication source.
844+ List <String > remoteWals = wals .stream ().filter (w -> SyncReplicationWALProvider
845+ .getSyncReplicationPeerIdFromWALName (w ).map (peerId ::equals ).orElse (false ))
846+ .collect (Collectors .toList ());
847+ LOG .debug ("Removing {} logs from remote dir {} in the list: {}" , remoteWals .size (),
848+ remoteWALDir , remoteWals );
849+ if (!remoteWals .isEmpty ()) {
850+ for (int sleepMultiplier = 0 ;;) {
851+ try {
852+ removeRemoteWALs (peerId , remoteWALDir , remoteWals );
853+ break ;
854+ } catch (IOException e ) {
855+ LOG .warn ("Failed to delete remote wals from remote dir {} for peer {}" , remoteWALDir ,
856+ peerId );
857+ }
858+ if (!isSourceActive ()) {
859+ // skip the following operations
860+ return ;
861+ }
862+ if (ReplicationUtils .sleepForRetries ("Failed to delete remote wals" , sleepForRetries ,
863+ sleepMultiplier , maxRetriesMultiplier )) {
864+ sleepMultiplier ++;
865+ }
866+ }
867+ }
868+ }
869+ for (String wal : wals ) {
870+ interruptOrAbortWhenFail (
871+ () -> this .queueStorage .removeWAL (server .getServerName (), getQueueId (), wal ));
872+ }
873+ }
874+
875+ public void cleanUpHFileRefs (List <String > files ) {
876+ interruptOrAbortWhenFail (() -> this .queueStorage .removeHFileRefs (getPeerId (), files ));
877+ }
878+
879+ @ FunctionalInterface
880+ private interface ReplicationQueueOperation {
881+ void exec () throws ReplicationException ;
882+ }
883+
884+ /**
885+ * Refresh replication source will terminate the old source first, then the source thread will be
886+ * interrupted. Need to handle it instead of abort the region server.
887+ */
888+ private void interruptOrAbortWhenFail (ReplicationQueueOperation op ) {
889+ try {
890+ op .exec ();
891+ } catch (ReplicationException e ) {
892+ if (e .getCause () != null && e .getCause () instanceof KeeperException .SystemErrorException
893+ && e .getCause ().getCause () != null && e .getCause ()
894+ .getCause () instanceof InterruptedException ) {
895+ // ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
896+ // that thread is interrupted deep down in the stack, it should pass the following
897+ // processing logic and propagate to the most top layer which can handle this exception
898+ // properly. In this specific case, the top layer is ReplicationSourceShipper#run().
899+ throw new ReplicationRuntimeException (
900+ "Thread is interrupted, the replication source may be terminated" ,
901+ e .getCause ().getCause ());
902+ }
903+ server .abort ("Failed to operate on replication queue" , e );
904+ }
905+ }
773906}
0 commit comments