Skip to content

Commit de926ba

Browse files
committed
HBASE-24684 Fetch ReplicationSink servers list from HMaster instead of ZooKeeper
1 parent fae9f0c commit de926ba

18 files changed

Lines changed: 346 additions & 20 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2448,4 +2448,11 @@ Pair<List<String>, List<TableName>> getConfiguredNamespacesAndTablesInRSGroup(St
24482448
*/
24492449
void updateRSGroupConfig(String groupName, Map<String, String> configuration) throws IOException;
24502450

2451+
/**
2452+
* Get a list of servers' addresses for replication sink.
2453+
* @return a list of servers' addresses
2454+
* @throws IOException if a remote or network exception occurs
2455+
*/
2456+
List<ServerName> listReplicationSinkServers() throws IOException;
2457+
24512458
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,4 +1052,9 @@ public void updateRSGroupConfig(String groupName, Map<String, String> configurat
10521052
throws IOException {
10531053
get(admin.updateRSGroupConfig(groupName, configuration));
10541054
}
1055+
1056+
@Override
1057+
public List<ServerName> listReplicationSinkServers() throws IOException {
1058+
return get(admin.listReplicationSinkServers());
1059+
}
10551060
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1655,4 +1655,10 @@ CompletableFuture<List<OnlineLogRecord>> getSlowLogResponses(final Set<ServerNam
16551655
* @throws IOException if a remote or network exception occurs
16561656
*/
16571657
CompletableFuture<Void> updateRSGroupConfig(String groupName, Map<String, String> configuration);
1658+
1659+
/**
1660+
* Get a list of servers' addresses for replication sink
1661+
* @return a list of servers' addresses
1662+
*/
1663+
CompletableFuture<List<ServerName>> listReplicationSinkServers();
16581664
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -929,4 +929,9 @@ public CompletableFuture<Void> renameRSGroup(String oldName, String newName) {
929929
updateRSGroupConfig(String groupName, Map<String, String> configuration) {
930930
return wrap(rawAdmin.updateRSGroupConfig(groupName, configuration));
931931
}
932+
933+
@Override
934+
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
935+
return wrap(rawAdmin.listReplicationSinkServers());
936+
}
932937
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@
221221
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
222222
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
223223
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
224+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
225+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
224226
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
225227
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
226228
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@@ -4200,4 +4202,16 @@ this.<UpdateRSGroupConfigRequest, UpdateRSGroupConfigResponse, Void> call(
42004202
(s, c, req, done) -> s.updateRSGroupConfig(c, req, done), resp -> null))
42014203
).call();
42024204
}
4205+
4206+
@Override
4207+
public CompletableFuture<List<ServerName>> listReplicationSinkServers() {
4208+
return this.<List<ServerName>> newMasterCaller()
4209+
.action(((controller, stub) ->
4210+
this.<ListReplicationSinkServersRequest, ListReplicationSinkServersResponse,
4211+
List<ServerName>>call(
4212+
controller, stub, ListReplicationSinkServersRequest.newBuilder().build(),
4213+
(s, c, req, done) -> s.listReplicationSinkServers(c, req, done),
4214+
resp -> ProtobufUtil.toServerNameList(resp.getServerNameList())))
4215+
).call();
4216+
}
42034217
}

hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,13 @@ message SwitchExceedThrottleQuotaResponse {
690690
required bool previous_exceed_throttle_quota_enabled = 1;
691691
}
692692

693+
message ListReplicationSinkServersRequest {
694+
}
695+
696+
message ListReplicationSinkServersResponse {
697+
repeated ServerName server_name = 1;
698+
}
699+
693700
service MasterService {
694701
/** Used by the client to get the number of regions that have received the updated schema */
695702
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
@@ -1119,7 +1126,10 @@ service MasterService {
11191126
returns (RenameRSGroupResponse);
11201127

11211128
rpc UpdateRSGroupConfig(UpdateRSGroupConfigRequest)
1122-
returns (UpdateRSGroupConfigResponse);
1129+
returns (UpdateRSGroupConfigResponse);
1130+
1131+
rpc ListReplicationSinkServers(ListReplicationSinkServersRequest)
1132+
returns (ListReplicationSinkServersResponse);
11231133
}
11241134

11251135
// HBCK Service definitions.

hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1784,4 +1784,20 @@ default void preHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment>
17841784
default void postHasUserPermissions(ObserverContext<MasterCoprocessorEnvironment> ctx,
17851785
String userName, List<Permission> permissions) throws IOException {
17861786
}
1787+
1788+
/**
1789+
* Called before getting servers for replication sink.
1790+
* @param ctx the coprocessor instance's environment
1791+
*/
1792+
default void preListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1793+
throws IOException {
1794+
}
1795+
1796+
/**
1797+
* Called after getting servers for replication sink.
1798+
* @param ctx the coprocessor instance's environment
1799+
*/
1800+
default void postListReplicationSinkServers(ObserverContext<MasterCoprocessorEnvironment> ctx)
1801+
throws IOException {
1802+
}
17871803
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3941,4 +3941,9 @@ public MetaRegionLocationCache getMetaRegionLocationCache() {
39413941
public RSGroupInfoManager getRSGroupInfoManager() {
39423942
return rsGroupInfoManager;
39433943
}
3944+
3945+
@Override
3946+
public List<ServerName> listReplicationSinkServers() throws IOException {
3947+
return this.serverManager.getOnlineServersList();
3948+
}
39443949
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2039,4 +2039,22 @@ public void call(MasterObserver observer) throws IOException {
20392039
}
20402040
});
20412041
}
2042+
2043+
public void preListReplicationSinkServers() throws IOException {
2044+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
2045+
@Override
2046+
public void call(MasterObserver observer) throws IOException {
2047+
observer.preListReplicationSinkServers(this);
2048+
}
2049+
});
2050+
}
2051+
2052+
public void postListReplicationSinkServers() throws IOException {
2053+
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
2054+
@Override
2055+
public void call(MasterObserver observer) throws IOException {
2056+
observer.postListReplicationSinkServers(this);
2057+
}
2058+
});
2059+
}
20422060
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@
248248
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
249249
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesRequest;
250250
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespacesResponse;
251+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersRequest;
252+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListReplicationSinkServersResponse;
251253
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
252254
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
253255
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
@@ -3274,4 +3276,25 @@ public UpdateRSGroupConfigResponse updateRSGroupConfig(RpcController controller,
32743276
}
32753277
return builder.build();
32763278
}
3279+
3280+
@Override
3281+
public ListReplicationSinkServersResponse listReplicationSinkServers(
3282+
RpcController controller, ListReplicationSinkServersRequest request)
3283+
throws ServiceException {
3284+
ListReplicationSinkServersResponse.Builder builder =
3285+
ListReplicationSinkServersResponse.newBuilder();
3286+
try {
3287+
if (master.getMasterCoprocessorHost() != null) {
3288+
master.getMasterCoprocessorHost().preListReplicationSinkServers();
3289+
}
3290+
builder.addAllServerName(master.listReplicationSinkServers().stream()
3291+
.map(ProtobufUtil::toServerName).collect(Collectors.toList()));
3292+
if (master.getMasterCoprocessorHost() != null) {
3293+
master.getMasterCoprocessorHost().postListReplicationSinkServers();
3294+
}
3295+
} catch (IOException e) {
3296+
throw new ServiceException(e);
3297+
}
3298+
return builder.build();
3299+
}
32773300
}

0 commit comments

Comments
 (0)