Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,11 +403,13 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
}

// populate puts
for (RSGroupInfo RSGroupInfo : groupMap.values()) {
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
for (RSGroupInfo gi : groupMap.values()) {
if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
Put p = new Put(Bytes.toBytes(gi.getName()));
p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
mutations.add(p);
}
}

if (mutations.size() > 0) {
Expand Down Expand Up @@ -456,7 +458,12 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro

// Make changes visible after having been persisted to the source of truth
resetRSGroupMap(newGroupMap);
saveRSGroupMapToZK(newGroupMap);

updateCacheOfRSGroups(newGroupMap.keySet());
}

private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOException {
try {
String groupBasePath =
ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, RS_GROUP_ZNODE);
Expand All @@ -470,14 +477,16 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
}
}

for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(RSGroupInfo);
LOG.debug("Updating znode: " + znode);
ZKUtil.createAndFailSilent(watcher, znode);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
ProtobufUtil.prependPBMagic(proto.toByteArray())));
for (RSGroupInfo gi : newGroupMap.values()) {
if (!gi.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
String znode = ZNodePaths.joinZNode(groupBasePath, gi.getName());
RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(gi);
LOG.debug("Updating znode: " + znode);
ZKUtil.createAndFailSilent(watcher, znode);
zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
ProtobufUtil.prependPBMagic(proto.toByteArray())));
}
}
LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());

Expand All @@ -487,7 +496,6 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro
masterServices.abort("Failed to write to rsGroupZNode", e);
throw new IOException("Failed to write to rsGroupZNode", e);
}
updateCacheOfRSGroups(newGroupMap.keySet());
}

/**
Expand Down Expand Up @@ -547,12 +555,12 @@ private SortedSet<Address> getDefaultServers() throws IOException {

// Called by ServerEventsListenerThread. Synchronize on this because redoing
// the rsGroupMap then writing it out.
private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
private synchronized void updateDefaultServers(SortedSet<Address> servers) {
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
newGroupMap.put(newInfo.getName(), newInfo);
flushConfig(newGroupMap);
resetRSGroupMap(newGroupMap);
}

// Called by FailedOpenUpdaterThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -133,6 +136,13 @@ public void verify() throws IOException {
tds.addAll(admin.listTableDescriptors());
tds.addAll(admin.listTableDescriptorsByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME));
}
SortedSet<Address> lives = Sets.newTreeSet();
for (ServerName sn : conn.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet()) {
lives.add(sn.getAddress());
}
for (ServerName sn : conn.getAdmin().listDecommissionedRegionServers()) {
lives.remove(sn.getAddress());
}
try (Table table = conn.getTable(RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan())) {
for (;;) {
Expand All @@ -144,8 +154,22 @@ public void verify() throws IOException {
RSGroupInfoManagerImpl.META_FAMILY_BYTES, RSGroupInfoManagerImpl.META_QUALIFIER_BYTES));
RSGroupInfo rsGroupInfo = ProtobufUtil.toGroupInfo(proto);
groupMap.put(proto.getName(), RSGroupUtil.fillTables(rsGroupInfo, tds));
for(Address address : rsGroupInfo.getServers()){
lives.remove(address);
}
}
}
SortedSet<TableName> tables = Sets.newTreeSet();
for (TableDescriptor td : conn.getAdmin().listTableDescriptors(Pattern.compile(".*"),
true)){
String groupName = td.getRegionServerGroup().orElse(RSGroupInfo.DEFAULT_GROUP);
if (groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
tables.add(td.getTableName());
}
}

groupMap.put(RSGroupInfo.DEFAULT_GROUP,
new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, lives, tables));
assertEquals(Sets.newHashSet(groupMap.values()), Sets.newHashSet(wrapped.listRSGroups()));
try {
String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup");
Expand All @@ -160,6 +184,7 @@ public void verify() throws IOException {
zList.add(RSGroupUtil.fillTables(rsGroupInfo, tds));
}
}
groupMap.remove(RSGroupInfo.DEFAULT_GROUP);
assertEquals(zList.size(), groupMap.size());
for (RSGroupInfo rsGroupInfo : zList) {
assertTrue(groupMap.get(rsGroupInfo.getName()).equals(rsGroupInfo));
Expand Down