Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();

/**
* @deprecated since 2.4.0 and will be removed in 4.0.0.
* Use {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
* @deprecated since 2.4.0 and will be removed in 4.0.0. Use
* {@link HRegionServer#UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY} instead.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-24667">HBASE-24667</a>
*/
@Deprecated
Expand Down Expand Up @@ -521,9 +521,10 @@ protected String getUseThisHostnameInstead(Configuration conf) throws IOExceptio
String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
if (conf.getBoolean(UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
if (!StringUtils.isBlank(hostname)) {
String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + UNSAFE_RS_HOSTNAME_KEY +
" are mutually exclusive. Do not set " + UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
" to true while " + UNSAFE_RS_HOSTNAME_KEY + " is used";
String msg = UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " +
UNSAFE_RS_HOSTNAME_KEY + " are mutually exclusive. Do not set " +
UNSAFE_RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " to true while " +
UNSAFE_RS_HOSTNAME_KEY + " is used";
throw new IOException(msg);
} else {
return rpcServices.getSocketAddress().getHostName();
Expand Down Expand Up @@ -617,7 +618,9 @@ public boolean registerService(Service instance) {
private static void checkCodecs(final Configuration c) throws IOException {
// check to see if the codec list is available:
String [] codecs = c.getStrings(REGIONSERVER_CODEC, (String[])null);
if (codecs == null) return;
if (codecs == null) {
return;
}
for (String codec : codecs) {
if (!CompressionTest.testCompression(codec)) {
throw new IOException("Compression codec " + codec +
Expand Down Expand Up @@ -852,9 +855,15 @@ public void run() {

// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
if (this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.hMemManager != null) {
this.hMemManager.stop();
}
if (this.cacheFlusher != null) {
this.cacheFlusher.interruptIfNecessary();
}
if (this.compactSplitThread != null) {
this.compactSplitThread.interruptIfNecessary();
}

// Stop the snapshot and other procedure handlers, forcefully killing all running tasks
if (rspmHost != null) {
Expand Down Expand Up @@ -949,7 +958,9 @@ private boolean containsMetaTableRegions() {
}

private boolean areAllUserRegionsOffline() {
if (getNumberOfOnlineRegions() > 2) return false;
if (getNumberOfOnlineRegions() > 2) {
return false;
}
boolean allUserRegionsOffline = true;
for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
if (!e.getValue().getRegionInfo().isMetaRegion()) {
Expand Down Expand Up @@ -1185,7 +1196,9 @@ private ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, lon
private String getOnlineRegionsAsPrintableString() {
StringBuilder sb = new StringBuilder();
for (Region r: this.onlineRegions.values()) {
if (sb.length() > 0) sb.append(", ");
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(r.getRegionInfo().getEncodedName());
}
return sb.toString();
Expand Down Expand Up @@ -1284,7 +1297,7 @@ private void shutdownWAL(final boolean close) {
* @param c Extra configuration.
*/
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
throws IOException {
throws IOException {
try {
boolean updateRootDir = false;
for (NameStringPair e : c.getMapEntriesList()) {
Expand Down Expand Up @@ -1560,7 +1573,7 @@ protected void chore() {
this.instance.compactSplitThread.requestCompaction(hr, s,
getName() + " requests major compaction; use default priority",
Store.NO_PRIORITY,
CompactionLifeCycleTracker.DUMMY, null);
CompactionLifeCycleTracker.DUMMY, null);
} else {
this.instance.compactSplitThread.requestCompaction(hr, s,
getName() + " requests major compaction; use configured priority",
Expand Down Expand Up @@ -1595,7 +1608,9 @@ private static class PeriodicMemStoreFlusher extends ScheduledChore {
protected void chore() {
final StringBuilder whyFlush = new StringBuilder();
for (HRegion r : this.server.onlineRegions.values()) {
if (r == null) continue;
if (r == null) {
continue;
}
if (r.shouldFlush(whyFlush)) {
FlushRequester requester = server.getFlushRequester();
if (requester != null) {
Expand Down Expand Up @@ -1698,7 +1713,7 @@ private void startServices() throws IOException {
// Health checker thread.
if (isHealthCheckerConfigured()) {
int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}
// Executor status collect thread.
Expand Down Expand Up @@ -2134,7 +2149,9 @@ public boolean reportRegionStateTransition(final RegionStateTransitionContext co
" after " + pauseTime + "ms delay (Master is coming online...).":
" immediately."),
ioe);
if (pause) Threads.sleep(pauseTime);
if (pause) {
Threads.sleep(pauseTime);
}
tries++;
if (rssStub == rss) {
rssStub = null;
Expand Down Expand Up @@ -2168,7 +2185,7 @@ private void triggerFlushInPrimaryRegion(final HRegion region) {
} else {
LOG.info("Executor is null; not running flush of primary region replica for {}",
region.getRegionInfo());
}
}
}

@InterfaceAudience.Private
Expand Down Expand Up @@ -2299,8 +2316,7 @@ protected void stopServiceThreads() {
}

/**
* @return Return the object that implements the replication
* source executorService.
* @return Return the object that implements the replication source executorService.
*/
@Override
public ReplicationSourceService getReplicationSourceService() {
Expand Down Expand Up @@ -2397,8 +2413,8 @@ protected synchronized ServerName createRegionServerStatusStub(boolean refresh)
}

/**
* @return True if we should break loop because cluster is going down or
* this server has been stopped or hdfs has gone bad.
* @return True if we should break loop because cluster is going down or this server has been
* stopped or hdfs has gone bad.
*/
private boolean keepLooping() {
return !this.stopped && isClusterUp();
Expand All @@ -2412,10 +2428,14 @@ private boolean keepLooping() {
* @throws IOException
*/
private RegionServerStartupResponse reportForDuty() throws IOException {
if (this.masterless) return RegionServerStartupResponse.getDefaultInstance();
if (this.masterless) {
return RegionServerStartupResponse.getDefaultInstance();
}
ServerName masterServerName = createRegionServerStatusStub(true);
RegionServerStatusService.BlockingInterface rss = rssStub;
if (masterServerName == null || rss == null) return null;
if (masterServerName == null || rss == null) {
return null;
}
RegionServerStartupResponse result = null;
try {
rpcServices.requestCount.reset();
Expand Down Expand Up @@ -2493,12 +2513,16 @@ private void closeMetaTableRegions(final boolean abort) {
if (hri.isMetaRegion()) {
meta = e.getValue();
}
if (meta != null) break;
if (meta != null) {
break;
}
}
} finally {
this.onlineRegionsLock.writeLock().unlock();
}
if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
if (meta != null) {
closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
}
}

/**
Expand Down Expand Up @@ -2722,17 +2746,17 @@ public static void main(String[] args) {
*/
@Override
public List<HRegion> getRegions(TableName tableName) {
List<HRegion> tableRegions = new ArrayList<>();
synchronized (this.onlineRegions) {
for (HRegion region: this.onlineRegions.values()) {
RegionInfo regionInfo = region.getRegionInfo();
if(regionInfo.getTable().equals(tableName)) {
tableRegions.add(region);
}
}
}
return tableRegions;
}
List<HRegion> tableRegions = new ArrayList<>();
synchronized (this.onlineRegions) {
for (HRegion region: this.onlineRegions.values()) {
RegionInfo regionInfo = region.getRegionInfo();
if(regionInfo.getTable().equals(tableName)) {
tableRegions.add(region);
}
}
}
return tableRegions;
}

@Override
public List<HRegion> getRegions() {
Expand Down Expand Up @@ -2906,13 +2930,16 @@ public boolean removeRegion(final HRegion r, ServerName destination) {
if (closeSeqNum == HConstants.NO_SEQNUM) {
// No edits in WAL for this region; get the sequence number when the region was opened.
closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
if (closeSeqNum == HConstants.NO_SEQNUM) {
closeSeqNum = 0;
}
}
boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
if (selfMove) {
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
, new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(
r.getRegionInfo().getEncodedName(),
new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
}
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
Expand Down Expand Up @@ -3028,7 +3055,7 @@ public void updateRegionFavoredNodesMapping(String encodedRegionName,
* Return the favored nodes for a region given its encoded name. Look at the
* comment around {@link #regionFavoredNodesMap} on why we convert to InetSocketAddress[]
* here.
* @param encodedRegionName
* @param encodedRegionName the encoded region name.
* @return array of favored locations
*/
@Override
Expand All @@ -3048,7 +3075,7 @@ private static class MovedRegionInfo {
MovedRegionInfo(ServerName serverName, long closeSeqNum) {
this.serverName = serverName;
this.seqNum = closeSeqNum;
}
}

public ServerName getServerName() {
return serverName;
Expand All @@ -3065,7 +3092,8 @@ public long getSeqNum() {
*/
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);

private void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum, boolean selfMove) {
private void addToMovedRegions(String encodedName, ServerName destination,
long closeSeqNum, boolean selfMove) {
if (selfMove) {
LOG.warn("Not adding moved region record: " + encodedName + " to self.");
return;
Expand All @@ -3086,7 +3114,7 @@ public MovedRegionInfo getMovedRegion(String encodedRegionName) {

@InterfaceAudience.Private
public int movedRegionCacheExpiredTime() {
return TIMEOUT_REGION_MOVED;
return TIMEOUT_REGION_MOVED;
}

private String getMyEphemeralNodePath() {
Expand Down Expand Up @@ -3114,8 +3142,8 @@ CoprocessorServiceResponse execRegionServerService(
String serviceName = call.getServiceName();
Service service = coprocessorServiceHandlers.get(serviceName);
if (service == null) {
throw new UnknownProtocolException(null, "No registered coprocessor executorService found for " +
serviceName);
throw new UnknownProtocolException(null,
"No registered coprocessor executorService found for " + serviceName);
}
ServiceDescriptor serviceDesc =
service.getDescriptorForType();
Expand Down