Skip to content

Commit 8ebcd7f

Browse files
committed
HBASE-24735: Refactor ReplicationSourceManager: move logPositionAndCleanOldLogs/cleanUpHFileRefs to ReplicationSource inside (#2064)
Signed-off-by: Wellington Chevreuil <[email protected]>
1 parent e2bdc6d commit 8ebcd7f

File tree

12 files changed

+258
-254
lines changed

12 files changed

+258
-254
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.UUID;
2323
import java.util.concurrent.PriorityBlockingQueue;
24+
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.fs.FileStatus;
2627
import org.apache.hadoop.fs.FileSystem;
@@ -44,15 +45,18 @@ public class RecoveredReplicationSource extends ReplicationSource {
4445

4546
private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
4647

48+
private Path walDir;
49+
4750
private String actualPeerId;
4851

4952
@Override
50-
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
51-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
52-
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
53-
MetricsSource metrics) throws IOException {
54-
super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
53+
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
54+
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
55+
String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
56+
MetricsSource metrics) throws IOException {
57+
super.init(conf, fs, walDir, manager, queueStorage, replicationPeer, server, peerClusterZnode,
5558
clusterId, walFileLengthProvider, metrics);
59+
this.walDir = walDir;
5660
this.actualPeerId = this.replicationQueueInfo.getPeerId();
5761
}
5862

@@ -93,7 +97,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
9397
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
9498
for (Path possibleLogLocation : locs) {
9599
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
96-
if (manager.getFs().exists(possibleLogLocation)) {
100+
if (this.fs.exists(possibleLogLocation)) {
97101
// We found the right new location
98102
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
99103
newPaths.add(possibleLogLocation);
@@ -126,7 +130,7 @@ public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOExc
126130
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
127131
// area rather than to the wal area for a particular region server.
128132
private Path getReplSyncUpPath(Path path) throws IOException {
129-
FileStatus[] rss = fs.listStatus(manager.getLogDir());
133+
FileStatus[] rss = fs.listStatus(walDir);
130134
for (FileStatus rs : rss) {
131135
Path p = rs.getPath();
132136
FileStatus[] logs = fs.listStatus(p);

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import java.util.HashMap;
2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.NavigableSet;
3132
import java.util.TreeMap;
33+
import java.util.TreeSet;
3234
import java.util.UUID;
3335
import java.util.concurrent.ConcurrentHashMap;
3436
import java.util.concurrent.PriorityBlockingQueue;
@@ -37,6 +39,7 @@
3739
import java.util.concurrent.atomic.AtomicBoolean;
3840
import java.util.concurrent.atomic.AtomicLong;
3941
import java.util.function.Predicate;
42+
import java.util.stream.Collectors;
4043

4144
import org.apache.commons.lang3.StringUtils;
4245
import org.apache.commons.lang3.mutable.MutableBoolean;
@@ -54,16 +57,20 @@
5457
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
5558
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
5659
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
60+
import org.apache.hadoop.hbase.replication.ReplicationException;
5761
import org.apache.hadoop.hbase.replication.ReplicationPeer;
5862
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
5963
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
64+
import org.apache.hadoop.hbase.replication.ReplicationUtils;
6065
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
6166
import org.apache.hadoop.hbase.replication.WALEntryFilter;
6267
import org.apache.hadoop.hbase.util.Threads;
6368
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
69+
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
6470
import org.apache.hadoop.hbase.wal.WAL.Entry;
6571
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
6672
import org.apache.yetus.audience.InterfaceAudience;
73+
import org.apache.zookeeper.KeeperException;
6774
import org.slf4j.Logger;
6875
import org.slf4j.LoggerFactory;
6976
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -95,7 +102,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
95102
protected ReplicationQueueInfo replicationQueueInfo;
96103

97104
// The manager of all sources to which we ping back our progress
98-
protected ReplicationSourceManager manager;
105+
ReplicationSourceManager manager;
99106
// Should we stop everything?
100107
protected Server server;
101108
// How long should we sleep for each retry
@@ -140,8 +147,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
140147
protected final ConcurrentHashMap<String, ReplicationSourceShipper> workerThreads =
141148
new ConcurrentHashMap<>();
142149

143-
private AtomicLong totalBufferUsed;
144-
145150
public static final String WAIT_ON_ENDPOINT_SECONDS =
146151
"hbase.replication.wait.on.endpoint.seconds";
147152
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
@@ -193,7 +198,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
193198
* @param metrics metrics for replication source
194199
*/
195200
@Override
196-
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
201+
public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
197202
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
198203
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
199204
MetricsSource metrics) throws IOException {
@@ -221,7 +226,6 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
221226
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
222227
currentBandwidth = getCurrentBandwidth();
223228
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
224-
this.totalBufferUsed = manager.getTotalBufferUsed();
225229
this.walFileLengthProvider = walFileLengthProvider;
226230

227231
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort",
@@ -408,9 +412,9 @@ protected ReplicationSourceShipper createNewShipper(String walGroupId,
408412

409413
private ReplicationSourceWALReader createNewWALReader(String walGroupId,
410414
PriorityBlockingQueue<Path> queue, long startPosition) {
411-
return replicationPeer.getPeerConfig().isSerial()
412-
? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
413-
: new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
415+
return replicationPeer.getPeerConfig().isSerial() ?
416+
new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) :
417+
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
414418
}
415419

416420
/**
@@ -451,11 +455,6 @@ public ReplicationEndpoint getReplicationEndpoint() {
451455
return this.replicationEndpoint;
452456
}
453457

454-
@Override
455-
public ReplicationSourceManager getSourceManager() {
456-
return this.manager;
457-
}
458-
459458
@Override
460459
public void tryThrottle(int batchSize) throws InterruptedException {
461460
checkBandwidthChangeAndResetThrottler();
@@ -783,7 +782,7 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
783782
throttler.addPushSize(batchSize);
784783
}
785784
totalReplicatedEdits.addAndGet(entries.size());
786-
long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
785+
long newBufferUsed = manager.getTotalBufferUsed().addAndGet(-batchSize);
787786
// Record the new buffer usage
788787
this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
789788
}
@@ -818,4 +817,137 @@ void removeWorker(ReplicationSourceShipper worker) {
818817
private String logPeerId(){
819818
return "peerId=" + this.getPeerId() + ",";
820819
}
820+
821+
@VisibleForTesting
822+
public void setWALPosition(WALEntryBatch entryBatch) {
823+
String fileName = entryBatch.getLastWalPath().getName();
824+
interruptOrAbortWhenFail(() -> this.queueStorage
825+
.setWALPosition(server.getServerName(), getQueueId(), fileName,
826+
entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
827+
}
828+
829+
@VisibleForTesting
830+
public void cleanOldWALs(String log, boolean inclusive) {
831+
NavigableSet<String> walsToRemove = getWalsToRemove(log, inclusive);
832+
if (walsToRemove.isEmpty()) {
833+
return;
834+
}
835+
// cleanOldWALs may spend some time, especially for sync replication where we may want to
836+
// remove remote wals as the remote cluster may have already been down, so we do it outside
837+
// the lock to avoid block preLogRoll
838+
cleanOldWALs(walsToRemove);
839+
}
840+
841+
private NavigableSet<String> getWalsToRemove(String log, boolean inclusive) {
842+
NavigableSet<String> walsToRemove = new TreeSet<>();
843+
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
844+
try {
845+
this.queueStorage.getWALsInQueue(this.server.getServerName(), getQueueId()).forEach(wal -> {
846+
LOG.debug("getWalsToRemove wal {}", wal);
847+
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
848+
if (walPrefix.equals(logPrefix)) {
849+
walsToRemove.add(wal);
850+
}
851+
});
852+
} catch (ReplicationException e) {
853+
// Just log the exception here, as the recovered replication source will try to cleanup again.
854+
LOG.warn("Failed to read wals in queue {}", getQueueId(), e);
855+
}
856+
return walsToRemove.headSet(log, inclusive);
857+
}
858+
859+
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
860+
throws IOException {
861+
Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
862+
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
863+
for (String wal : wals) {
864+
Path walFile = new Path(remoteWALDirForPeer, wal);
865+
try {
866+
if (!fs.delete(walFile, false) && fs.exists(walFile)) {
867+
throw new IOException("Can not delete " + walFile);
868+
}
869+
} catch (FileNotFoundException e) {
870+
// Just ignore since this means the file has already been deleted.
871+
// The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
872+
// inexistent file, so here we deal with both, i.e, check the return value of the
873+
// FileSystem.delete, and also catch FNFE.
874+
LOG.debug("The remote wal {} has already been deleted?", walFile, e);
875+
}
876+
}
877+
}
878+
879+
private void cleanOldWALs(NavigableSet<String> wals) {
880+
LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
881+
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the
882+
// failover time if you want to transit the remote cluster from S to A. And the infinite retry
883+
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
884+
// not contact with the HBase cluster either, so the replication will be blocked either.
885+
if (isSyncReplication()) {
886+
String peerId = getPeerId();
887+
String remoteWALDir = replicationPeer.getPeerConfig().getRemoteWALDir();
888+
// Filter out the wals need to be removed from the remote directory. Its name should be the
889+
// special format, and also, the peer id in its name should match the peer id for the
890+
// replication source.
891+
List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
892+
.getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
893+
.collect(Collectors.toList());
894+
LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
895+
remoteWALDir, remoteWals);
896+
if (!remoteWals.isEmpty()) {
897+
for (int sleepMultiplier = 0;;) {
898+
try {
899+
removeRemoteWALs(peerId, remoteWALDir, remoteWals);
900+
break;
901+
} catch (IOException e) {
902+
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
903+
peerId);
904+
}
905+
if (!isSourceActive()) {
906+
// skip the following operations
907+
return;
908+
}
909+
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
910+
sleepMultiplier, maxRetriesMultiplier)) {
911+
sleepMultiplier++;
912+
}
913+
}
914+
}
915+
}
916+
for (String wal : wals) {
917+
interruptOrAbortWhenFail(
918+
() -> this.queueStorage.removeWAL(server.getServerName(), getQueueId(), wal));
919+
}
920+
}
921+
922+
public void cleanUpHFileRefs(List<String> files) {
923+
interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(getPeerId(), files));
924+
}
925+
926+
@FunctionalInterface
927+
private interface ReplicationQueueOperation {
928+
void exec() throws ReplicationException;
929+
}
930+
931+
/**
932+
* Refresh replication source will terminate the old source first, then the source thread will be
933+
* interrupted. Need to handle it instead of abort the region server.
934+
*/
935+
private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
936+
try {
937+
op.exec();
938+
} catch (ReplicationException e) {
939+
if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
940+
&& e.getCause().getCause() != null && e.getCause()
941+
.getCause() instanceof InterruptedException) {
942+
// ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
943+
// that thread is interrupted deep down in the stack, it should pass the following
944+
// processing logic and propagate to the most top layer which can handle this exception
945+
// properly. In this specific case, the top layer is ReplicationSourceShipper#run().
946+
throw new ReplicationRuntimeException(
947+
"Thread is interrupted, the replication source may be terminated",
948+
e.getCause().getCause());
949+
}
950+
server.abort("Failed to operate on replication queue", e);
951+
}
952+
}
821953
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ public interface ReplicationSourceInterface {
4343

4444
/**
4545
* Initializer for the source
46-
* @param conf the configuration to use
47-
* @param fs the file system to use
48-
* @param manager the manager to use
46+
*
47+
* @param conf the configuration to use
48+
* @param fs the file system to use
4949
* @param server the server for this region server
5050
*/
51-
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
52-
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
53-
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
54-
MetricsSource metrics) throws IOException;
51+
void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceManager manager,
52+
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
53+
String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
54+
MetricsSource metrics) throws IOException;
5555

5656
/**
5757
* Add a log to the list of logs to replicate
@@ -147,11 +147,6 @@ default boolean isSyncReplication() {
147147
*/
148148
ReplicationEndpoint getReplicationEndpoint();
149149

150-
/**
151-
* @return the replication source manager
152-
*/
153-
ReplicationSourceManager getSourceManager();
154-
155150
/**
156151
* @return the wal file length provider
157152
*/
@@ -192,4 +187,18 @@ default Map<String, ReplicationStatus> getWalGroupStatus() {
192187
default boolean isRecovered() {
193188
return false;
194189
}
190+
191+
/**
192+
* Set the current position of WAL to {@link ReplicationQueueStorage}
193+
* @param entryBatch a batch of WAL entries to replicate
194+
*/
195+
void setWALPosition(WALEntryBatch entryBatch);
196+
197+
/**
198+
* Cleans a WAL and all older WALs from replication queue. Called when we are sure that a WAL is
199+
* closed and has no more entries.
200+
* @param walName the name of WAL
201+
* @param inclusive whether we should also remove the given WAL
202+
*/
203+
void cleanOldWALs(String walName, boolean inclusive);
195204
}

0 commit comments

Comments
 (0)