diff --git a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto index ed334c476768..925aed4feaff 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto @@ -24,9 +24,21 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; +import "HBase.proto"; import "server/region/Admin.proto"; +message StartReplicationSourceRequest { + required ServerName server_name = 1; + required string queue_id = 2; +} + +message StartReplicationSourceResponse { +} + service ReplicationServerService { rpc ReplicateWALEntry(ReplicateWALEntryRequest) returns(ReplicateWALEntryResponse); -} \ No newline at end of file + + rpc StartReplicationSource(StartReplicationSourceRequest) + returns(StartReplicationSourceResponse); +} diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 6f1f5a3d6a42..2b9594eac62a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -80,7 +80,7 @@ * */ @InterfaceAudience.Private -class ZKReplicationQueueStorage extends ZKReplicationStorageBase +public class ZKReplicationQueueStorage extends ZKReplicationStorageBase implements ReplicationQueueStorage { private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); @@ -123,7 +123,7 @@ public String getRsNode(ServerName serverName) { return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); } - private String getQueueNode(ServerName serverName, String queueId) { + public String getQueueNode(ServerName serverName, String queueId) { return ZNodePaths.joinZNode(getRsNode(serverName), queueId); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index 596167f9abfc..a239bf831270 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -74,4 +74,8 @@ protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.Sta throw new RuntimeException(e); } } + + public ZKWatcher getZookeeper() { + return this.zookeeper; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 9f229f4dba48..02a897b37fbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -3396,7 +3396,7 @@ public ListReplicationSinkServersResponse listReplicationSinkServers( if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().preListReplicationSinkServers(); } - builder.addAllServerName(master.listReplicationSinkServers().stream() + builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream() .map(ProtobufUtil::toServerName).collect(Collectors.toList())); if (master.getMasterCoprocessorHost() != null) { master.getMasterCoprocessorHost().postListReplicationSinkServers(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bc138c46c5ec..cd5c82d42fe0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -257,7 +257,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -271,7 +270,7 @@ @SuppressWarnings("deprecation") public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, - ConfigurationObserver, ReplicationServerService.BlockingInterface { + ConfigurationObserver { protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ @@ -1491,9 +1490,6 @@ protected List getServices() { bssi.add(new BlockingServiceAndInterface( AdminService.newReflectiveBlockingService(this), AdminService.BlockingInterface.class)); - bssi.add(new BlockingServiceAndInterface( - ReplicationServerService.newReflectiveBlockingService(this), - ReplicationServerService.BlockingInterface.class)); } return new org.apache.hbase.thirdparty.com.google.common.collect. ImmutableList.Builder().addAll(bssi).build(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 6967c6fee6b0..69264a4555be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -321,6 +321,10 @@ protected void chooseSinks() { if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { useZk = false; slaveAddresses = fetchSlavesAddresses(); + if (slaveAddresses.isEmpty()) { + LOG.warn("No sinks available at peer. Try fetch sinks by using zk."); + useZk = true; + } } else { useZk = true; } @@ -328,13 +332,15 @@ protected void chooseSinks() { LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t); useZk = true; } + if (useZk) { slaveAddresses = fetchSlavesAddressesByZK(); } if (slaveAddresses.isEmpty()) { - LOG.warn("No sinks available at peer. Will not be able to replicate"); + LOG.warn("No sinks available at peer. Will not be able to replicate."); } + Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); synchronized (this) { @@ -368,10 +374,10 @@ protected SinkPeer getReplicationSink() throws IOException { } private SinkPeer createSinkPeer(ServerName serverName) throws IOException { - if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) { - return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); - } else { + if (fetchServersUseZk) { return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName)); + } else { + return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index 4b53bb7b7fa3..4c8bb115b814 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -20,10 +20,19 @@ import java.io.IOException; import java.lang.management.MemoryUsage; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.OptionalLong; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -33,18 +42,31 @@ import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.regionserver.ReplicationService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource; +import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceFactory; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.security.SecurityConstants; +import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.ReflectionUtils; @@ -66,7 +88,7 @@ */ @InterfaceAudience.Private @SuppressWarnings({ "deprecation"}) -public class HReplicationServer extends Thread implements Server { +public class HReplicationServer extends Thread implements Server, ReplicationSourceController { private static final Logger LOG = LoggerFactory.getLogger(HReplicationServer.class); @@ -76,7 +98,7 @@ public class HReplicationServer extends Thread implements Server { /** * This servers start code. */ - protected final long startCode; + private final long startCode; private volatile boolean stopped = false; @@ -85,7 +107,11 @@ public class HReplicationServer extends Thread implements Server { private AtomicBoolean abortRequested; // flag set after we're done setting up server threads - final AtomicBoolean online = new AtomicBoolean(false); + private final AtomicBoolean online = new AtomicBoolean(false); + + private final int msgInterval; + // A sleeper that sleeps for msgInterval. + private final Sleeper sleeper; /** * The server name the Master sees us as. Its made from the hostname the @@ -94,18 +120,22 @@ public class HReplicationServer extends Thread implements Server { */ private ServerName serverName; - protected final Configuration conf; + private final Configuration conf; - private ReplicationSinkService replicationSinkService; + // zookeeper connection and watcher + private final ZKWatcher zooKeeper; - final int msgInterval; - // A sleeper that sleeps for msgInterval. - protected final Sleeper sleeper; + private final UUID clusterId; private final int shortOperationTimeout; - // zookeeper connection and watcher - protected final ZKWatcher zooKeeper; + private HFileSystem walFs; + private Path walRootDir; + + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + private ChoreService choreService; // master address tracker private final MasterAddressTracker masterAddressTracker; @@ -113,11 +143,23 @@ public class HReplicationServer extends Thread implements Server { /** * The asynchronous cluster connection to be shared by services. */ - protected AsyncClusterConnection asyncClusterConnection; + private AsyncClusterConnection asyncClusterConnection; private UserProvider userProvider; - protected final ReplicationServerRpcServices rpcServices; + final ReplicationServerRpcServices rpcServices; + + // Total buffer size on this RegionServer for holding batched edits to be shipped. + private final long totalBufferLimit; + private AtomicLong totalBufferUsed = new AtomicLong(); + + private final MetricsReplicationGlobalSourceSource globalMetrics; + private final Map sourceMetrics = new HashMap<>(); + private final ConcurrentMap sources = + new ConcurrentHashMap<>(); + + private final ReplicationQueueStorage queueStorage; + private final ReplicationPeers replicationPeers; // Stub to do region server status calls against the master. private volatile ReplicationServerStatusService.BlockingInterface rssStub; @@ -125,12 +167,9 @@ public class HReplicationServer extends Thread implements Server { // RPC client. Used to make the stub above that does region server status checking. private RpcClient rpcClient; - /** - * ChoreService used to schedule tasks that we want to run periodically - */ - private ChoreService choreService; + private ReplicationSinkService replicationSinkService; - public HReplicationServer(final Configuration conf) throws IOException { + public HReplicationServer(final Configuration conf) throws Exception { TraceUtil.initTracer(conf); try { this.startCode = System.currentTimeMillis(); @@ -144,12 +183,29 @@ public HReplicationServer(final Configuration conf) throws IOException { serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startCode); this.userProvider = UserProvider.instantiate(conf); + // login the zookeeper client principal (if using security) + ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, + HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); + // login the server principal (if using secure Hadoop) + this.userProvider.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, + SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, hostName); + // init superusers and add the server principal (if using security) + // or process owner as default super user. + Superusers.initialize(conf); this.msgInterval = conf.getInt("hbase.replicationserver.msginterval", 3 * 1000); this.sleeper = new Sleeper(this.msgInterval, this); this.shortOperationTimeout = conf.getInt(HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT); + this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, + HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + this.globalMetrics = + CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class) + .getGlobalSource(); + + initializeFileSystem(); + this.choreService = new ChoreService(getName(), true); // Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) { @@ -162,6 +218,12 @@ public HReplicationServer(final Configuration conf) throws IOException { zooKeeper = null; masterAddressTracker = null; } + + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zooKeeper, conf); + this.replicationPeers = + ReplicationFactory.getReplicationPeers(zooKeeper, this.conf); + this.replicationPeers.init(); + this.clusterId = ZKClusterId.getUUIDForCluster(zooKeeper); this.rpcServices.start(zooKeeper); this.choreService = new ChoreService(getName(), true); } catch (Throwable t) { @@ -172,6 +234,15 @@ public HReplicationServer(final Configuration conf) throws IOException { } } + private void initializeFileSystem() throws IOException { + // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase + // checksum verification enabled, then automatically switch off hdfs checksum verification. + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + CommonFSUtils.setFsDefault(this.conf, CommonFSUtils.getWALRootDir(this.conf)); + this.walFs = new HFileSystem(this.conf, useHBaseChecksum); + this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); + } + public String getProcessName() { return REPLICATION_SERVER; } @@ -291,6 +362,9 @@ protected void stopServiceThreads() { if (this.replicationSinkService != null) { this.replicationSinkService.stopReplicationService(); } + if (this.choreService != null) { + this.choreService.shutdown(); + } } @Override @@ -330,7 +404,7 @@ public CoordinatedStateManager getCoordinatedStateManager() { @Override public ChoreService getChoreService() { - return this.choreService; + return choreService; } @Override @@ -594,4 +668,69 @@ private static boolean sleepInterrupted(long millis) { } return interrupted; } + + @Override + public long getTotalBufferLimit() { + return this.totalBufferLimit; + } + + @Override + public AtomicLong getTotalBufferUsed() { + return this.totalBufferUsed; + } + + @Override + public MetricsReplicationGlobalSourceSource getGlobalMetrics() { + return this.globalMetrics; + } + + @Override + public void finishRecoveredSource(RecoveredReplicationSource src) { + this.sources.remove(src.getQueueId()); + this.sourceMetrics.remove(src.getQueueId()); + deleteQueue(src.getQueueId()); + LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(), + src.getStats()); + } + + public void startReplicationSource(ServerName producer, String queueId) + throws IOException, ReplicationException { + ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); + String peerId = replicationQueueInfo.getPeerId(); + this.replicationPeers.addPeer(peerId); + Path walDir = + new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString())); + MetricsSource metrics = new MetricsSource(queueId); + + ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + // init replication source + src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this, + producer, queueId, clusterId, p -> OptionalLong.empty(), metrics); + queueStorage.getWALsInQueue(producer, queueId) + .forEach(walName -> src.enqueueLog(new Path(walDir, walName))); + src.startup(); + sources.put(queueId, src); + sourceMetrics.put(queueId, metrics); + } + + /** + * Delete a complete queue of wals associated with a replication source + * @param queueId the id of replication queue to delete + */ + private void deleteQueue(String queueId) { + abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId)); + } + + @FunctionalInterface + private interface ReplicationQueueOperation { + void exec() throws ReplicationException; + } + + private void abortWhenFail(ReplicationQueueOperation op) { + try { + op.exec(); + } catch (ReplicationException e) { + abort("Failed to operate on replication queue", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java index 15d4f8c1a789..b8c388448019 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationServerRpcServices.java @@ -56,11 +56,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.StartReplicationSourceResponse; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -321,4 +324,16 @@ public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, throw new ServiceException(ie); } } + + @Override + public StartReplicationSourceResponse startReplicationSource(RpcController controller, + StartReplicationSourceRequest request) throws ServiceException { + try { + replicationServer.startReplicationSource(ProtobufUtil.toServerName(request.getServerName()), + request.getQueueId()); + return StartReplicationSourceResponse.newBuilder().build(); + } catch (Exception e) { + throw new ServiceException(e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index eece3c0fd5ae..2ecf90831e56 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -51,10 +51,11 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, - peerClusterZnode, clusterId, walFileLengthProvider, metrics); + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { + super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer, + queueId, clusterId, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6fee960cbc84..c81cc29a2748 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -63,10 +63,13 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.zookeeper.ZKListener; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -155,6 +158,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + private Thread fetchWALsThread; /** * WALs to replicate. @@ -192,8 +196,9 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.walDir = walDir; @@ -224,6 +229,19 @@ public void init(Configuration conf, FileSystem fs, Path walDir, this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", true); + if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, + HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) { + if (queueStorage instanceof ZKReplicationQueueStorage) { + ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage; + zkQueueStorage.getZookeeper().registerListener( + new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir)); + LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}", + producer, queueId); + } else { + throw new UnsupportedOperationException( + "hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage"); + } + } LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -258,7 +276,9 @@ public void enqueueLog(Path wal) { tryStartNewShipper(walPrefix, queue); } } else { - queue.put(wal); + if (!queue.contains(wal)) { + queue.put(wal); + } } if (LOG.isTraceEnabled()) { LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix, @@ -928,4 +948,36 @@ private void interruptOrAbortWhenFail(ReplicationQueueOperation op) { server.abort("Failed to operate on replication queue", e); } } + + /** + * Tracks changes to the WALs in the replication queue. + */ + public static class ReplicationQueueListener extends ZKListener { + + private final ReplicationSource source; + private final String queueNode; + private final Path walDir; + + public ReplicationQueueListener(ReplicationSource source, + ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) { + super(zkQueueStorage.getZookeeper()); + this.source = source; + this.queueNode = zkQueueStorage.getQueueNode(producer, queueId); + this.walDir = walDir; + } + + @Override + public synchronized void nodeChildrenChanged(String path) { + if (path.equals(queueNode)) { + LOG.info("Detected change to the WALs in the replication queue {}", queueNode); + try { + ZKUtil.listChildrenNoWatch(watcher, queueNode).forEach(walName -> { + source.enqueueLog(new Path(walDir, walName)); + }); + } catch (KeeperException e) { + LOG.warn("Failed to read WALs in the replication queue {}", queueNode, e); + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index d613049d3893..16a7692fc351 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -32,7 +32,7 @@ public class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); - static ReplicationSourceInterface create(Configuration conf, String queueId) { + public static ReplicationSourceInterface create(Configuration conf, String queueId) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index f3bf8a41ff90..d14062cd30c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -52,6 +52,7 @@ public interface ReplicationSourceInterface { * @param queueStorage the replication queue storage * @param replicationPeer the replication peer * @param server the server which start and run this replication source + * @param producer the name of region server which produce WAL to the replication queue * @param queueId the id of our replication queue * @param clusterId unique UUID for the cluster * @param walFileLengthProvider used to get the WAL length @@ -59,8 +60,9 @@ public interface ReplicationSourceInterface { */ void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException; /** * Add a log to the list of logs to replicate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index de9e21f99ae7..cea0fc216f11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -360,8 +360,8 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer MetricsSource metrics = new MetricsSource(queueId); sourceMetrics.put(queueId, metrics); // init replication source - src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(), + queueId, clusterId, walFileLengthProvider, metrics); return src; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 66059c722cb3..b4cce101a6d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -48,8 +48,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { this.queueId = queueId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index be44c6cb68a9..671e448dd624 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -231,7 +231,7 @@ static void restartTargetHBaseCluster(int numSlaves) throws Exception { htable2 = UTIL2.getConnection().getTable(tableName); } - private static void startClusters() throws Exception { + static void startClusters() throws Exception { UTIL1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); LOG.info("Setup first Zk"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java index 9ceaceec6c19..db4152e56b61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationFetchServers.java @@ -18,10 +18,10 @@ package org.apache.hadoop.hbase.replication; import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; - -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest; @@ -53,11 +54,14 @@ @Category({ ReplicationTests.class, MediumTests.class }) public class TestReplicationFetchServers extends TestReplicationBase { + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationFetchServers.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationFetchServers.class); + private static HReplicationServer replicationServer; + private static AtomicBoolean fetchFlag = new AtomicBoolean(false); public static class MyObserver implements MasterCoprocessor, MasterObserver { @@ -77,6 +81,17 @@ public void postListReplicationSinkServers(ObserverContext replicationServer.isOnline()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + if (!replicationServer.isStopped()) { + replicationServer.stop("test"); + } } @Before @@ -85,15 +100,23 @@ public void beforeMethod() { } @Test - public void testMasterListReplicationPeerServers() throws IOException, ServiceException { + public void testMasterListReplicationPeerServers() throws IOException { AsyncClusterConnection conn = UTIL2.getAsyncConnection(); ServerName master = UTIL2.getAdmin().getMaster(); - MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( - conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000)); - ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers( - null, ListReplicationSinkServersRequest.newBuilder().build()); - List servers = ProtobufUtil.toServerNameList(resp.getServerNameList()); - assertFalse(servers.isEmpty()); + // Wait for the replication server report to master + UTIL2.waitFor(60000, () -> { + List servers = new ArrayList<>(); + try { + MasterService.BlockingInterface masterStub = MasterService.newBlockingStub( + conn.getRpcClient().createBlockingRpcChannel(master, User.getCurrent(), 1000)); + ListReplicationSinkServersResponse resp = masterStub.listReplicationSinkServers( + null, ListReplicationSinkServersRequest.newBuilder().build()); + servers = ProtobufUtil.toServerNameList(resp.getServerNameList()); + } catch (Exception e) { + LOG.debug("Failed to list replication servers", e); + } + return servers.size() == 1; + }); assertTrue(fetchFlag.get()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java similarity index 89% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java index 30660c609a73..bad1dc1b1c3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSink.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.ReplicationServerManager; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.ReplicationServerSinkPeer; -import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -64,13 +63,13 @@ import org.slf4j.LoggerFactory; @Category({ReplicationTests.class, MediumTests.class}) -public class TestReplicationServer { +public class TestReplicationServerSink { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationServer.class); + HBaseClassTestRule.forClass(TestReplicationServerSink.class); - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServer.class); + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSink.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -145,22 +144,7 @@ public void testReplicateWAL() throws Exception { replicateWALEntryAndVerify(sinkPeer); } - /** - * Requests region server using {@link AsyncReplicationServerAdmin} - */ - @Test - public void testReplicateWAL2() throws Exception { - AsyncClusterConnection conn = - TEST_UTIL.getHBaseCluster().getMaster().getAsyncClusterConnection(); - ServerName rs = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().get(0) - .getRegionServer().getServerName(); - AsyncReplicationServerAdmin replAdmin = conn.getReplicationServerAdmin(rs); - - ReplicationServerSinkPeer sinkPeer = new ReplicationServerSinkPeer(rs, replAdmin); - replicateWALEntryAndVerify(sinkPeer); - } - - private void replicateWALEntryAndVerify(SinkPeer sinkPeer) throws Exception { + private void replicateWALEntryAndVerify(ReplicationServerSinkPeer sinkPeer) throws Exception { Entry[] entries = new Entry[BATCH_SIZE]; for(int i = 0; i < BATCH_SIZE; i++) { entries[i] = generateEdit(i, TABLENAME, Bytes.toBytes(i)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java new file mode 100644 index 000000000000..843e5b1657fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationServerSource.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationServerSource extends TestReplicationBase { + + @ClassRule public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationServerSource.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationServerSource.class); + + private static HReplicationServer replicationServer; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL1.getConfiguration().setBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, true); + TestReplicationBase.setUpBeforeClass(); + replicationServer = new HReplicationServer(UTIL1.getConfiguration()); + replicationServer.start(); + UTIL1.waitFor(60000, () -> replicationServer.isOnline()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + replicationServer.stop("Tear down after test"); + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void test() throws Exception { + try { + // Only start one region server in source cluster + ServerName producer = UTIL1.getMiniHBaseCluster().getRegionServer(0).getServerName(); + replicationServer.startReplicationSource(producer, PEER_ID2); + } catch (Throwable e) { + LOG.info("Failed to start replicaiton source", e); + } + runSmallBatchTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 43fef8815114..2a1d3befd9db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -131,7 +131,7 @@ public void testDefaultSkipsMetaWAL() throws IOException { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); @@ -169,8 +169,8 @@ public void testWALEntryFilter() throws IOException { String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, - uuid, p -> OptionalLong.empty(), new MetricsSource(queueId)); + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid, + p -> OptionalLong.empty(), new MetricsSource(queueId)); try { rs.startup(); TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null); @@ -257,8 +257,8 @@ public void testTerminateTimeout() throws Exception { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - source.init(testConf, null, null, manager, null, mockPeer, null, "testPeer", - null, p -> OptionalLong.empty(), null); + source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null, + p -> OptionalLong.empty(), null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit( () -> source.terminate("testing source termination")); @@ -471,7 +471,7 @@ private RegionServerServices setupForAbortTests(ReplicationSource rs, Configurat String queueId = "qid"; RegionServerServices rss = TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1")); - rs.init(conf, null, null, manager, null, mockPeer, rss, queueId, null, + rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null, p -> OptionalLong.empty(), new MetricsSource(queueId)); return rss; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 0e0353fc14da..d0504a5f933f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -414,7 +414,8 @@ public void testCleanupFailoverQueues() throws Exception { assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"), - manager.getServer(), id, null, p -> OptionalLong.empty(), null); + manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(), + null); source.cleanOldWALs(file2, false); // log1 should be deleted assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); @@ -631,16 +632,16 @@ public void testRemoveRemoteWALs() throws Exception { ReplicationSourceInterface source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), - mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), peerId2, null, - p -> OptionalLong.empty(), null); + mockReplicationPeerForSyncReplication(peerId2), manager.getServer(), + manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); // still there if peer id does not match assertTrue(fs.exists(remoteWAL)); source = new ReplicationSource(); source.init(conf, fs, null, manager, manager.getQueueStorage(), - mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), slaveId, null, - p -> OptionalLong.empty(), null); + mockReplicationPeerForSyncReplication(slaveId), manager.getServer(), + manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null); source.cleanOldWALs(walName, true); assertFalse(fs.exists(remoteWAL)); } finally { @@ -821,8 +822,9 @@ static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy @Override public void init(Configuration conf, FileSystem fs, Path walDir, ReplicationSourceController overallController, ReplicationQueueStorage queueStorage, - ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException{ + ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId, + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + throws IOException { throw new IOException("Failing deliberately"); } }