Skip to content

Commit 7d4b041

Browse files
committed
HBASE-25071 ReplicationServer support start ReplicationSource internal
1 parent f67c3df commit 7d4b041

18 files changed

+366
-87
lines changed

hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,21 @@ option java_generic_services = true;
2424
option java_generate_equals_and_hash = true;
2525
option optimize_for = SPEED;
2626

27+
import "HBase.proto";
2728
import "server/region/Admin.proto";
2829

30+
message StartReplicationSourceRequest {
31+
required ServerName server_name = 1;
32+
required string queue_id = 2;
33+
}
34+
35+
message StartReplicationSourceResponse {
36+
}
37+
2938
service ReplicationServerService {
3039
rpc ReplicateWALEntry(ReplicateWALEntryRequest)
3140
returns(ReplicateWALEntryResponse);
32-
}
41+
42+
rpc StartReplicationSource(StartReplicationSourceRequest)
43+
returns(StartReplicationSourceResponse);
44+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3396,7 +3396,7 @@ public ListReplicationSinkServersResponse listReplicationSinkServers(
33963396
if (master.getMasterCoprocessorHost() != null) {
33973397
master.getMasterCoprocessorHost().preListReplicationSinkServers();
33983398
}
3399-
builder.addAllServerName(master.listReplicationSinkServers().stream()
3399+
builder.addAllServerName(master.getReplicationServerManager().getOnlineServersList().stream()
34003400
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
34013401
if (master.getMasterCoprocessorHost() != null) {
34023402
master.getMasterCoprocessorHost().postListReplicationSinkServers();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@
257257
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
258258
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
259259
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
260-
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationServerProtos.ReplicationServerService;
261260
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
262261
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
263262
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -271,7 +270,7 @@
271270
@SuppressWarnings("deprecation")
272271
public class RSRpcServices implements HBaseRPCErrorHandler,
273272
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
274-
ConfigurationObserver, ReplicationServerService.BlockingInterface {
273+
ConfigurationObserver {
275274
protected static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
276275

277276
/** RPC scheduler to use for the region server. */
@@ -1491,9 +1490,6 @@ protected List<BlockingServiceAndInterface> getServices() {
14911490
bssi.add(new BlockingServiceAndInterface(
14921491
AdminService.newReflectiveBlockingService(this),
14931492
AdminService.BlockingInterface.class));
1494-
bssi.add(new BlockingServiceAndInterface(
1495-
ReplicationServerService.newReflectiveBlockingService(this),
1496-
ReplicationServerService.BlockingInterface.class));
14971493
}
14981494
return new org.apache.hbase.thirdparty.com.google.common.collect.
14991495
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
import org.apache.hadoop.hbase.security.User;
4646
import org.apache.hadoop.hbase.util.FutureUtils;
4747
import org.apache.hadoop.hbase.wal.WAL;
48-
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
4948
import org.apache.hadoop.hbase.zookeeper.ZKListener;
49+
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
5050
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
5151
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
5252
import org.apache.yetus.audience.InterfaceAudience;
@@ -321,20 +321,26 @@ protected void chooseSinks() {
321321
if (!useZk || ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
322322
useZk = false;
323323
slaveAddresses = fetchSlavesAddresses();
324+
if (slaveAddresses.isEmpty()) {
325+
LOG.warn("No sinks available at peer. Try fetch sinks by using zk.");
326+
useZk = true;
327+
}
324328
} else {
325329
useZk = true;
326330
}
327331
} catch (Throwable t) {
328332
LOG.warn("Peer {} try to fetch servers by admin failed. Using zk impl.", ctx.getPeerId(), t);
329333
useZk = true;
330334
}
335+
331336
if (useZk) {
332337
slaveAddresses = fetchSlavesAddressesByZK();
333338
}
334339

335340
if (slaveAddresses.isEmpty()) {
336-
LOG.warn("No sinks available at peer. Will not be able to replicate");
341+
LOG.warn("No sinks available at peer. Will not be able to replicate.");
337342
}
343+
338344
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
339345
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
340346
synchronized (this) {
@@ -368,10 +374,10 @@ protected SinkPeer getReplicationSink() throws IOException {
368374
}
369375

370376
private SinkPeer createSinkPeer(ServerName serverName) throws IOException {
371-
if (ReplicationUtils.isPeerClusterSupportReplicationOffload(conn)) {
372-
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
373-
} else {
377+
if (fetchServersUseZk) {
374378
return new RegionServerSinkPeer(serverName, conn.getRegionServerAdmin(serverName));
379+
} else {
380+
return new ReplicationServerSinkPeer(serverName, conn.getReplicationServerAdmin(serverName));
375381
}
376382
}
377383

0 commit comments

Comments
 (0)