Skip to content

Commit ba76b39

Browse files
committed
HBASE-25779 HRegionServer#compactSplitThread should be private
Minor refactor. Make the `compactSplitThread` member field of `HRegionServer` private, and gate all access through the getter method. Signed-off-by: Yulin Niu <[email protected]> Signed-off-by: Pankaj Kumar <[email protected]>
1 parent 01ce44a commit ba76b39

File tree

10 files changed

+58
-64
lines changed

10 files changed

+58
-64
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public class HRegionServer extends Thread implements
304304
private ReplicationSinkService replicationSinkHandler;
305305

306306
// Compactions
307-
public CompactSplit compactSplitThread;
307+
private CompactSplit compactSplitThread;
308308

309309
/**
310310
* Map of regions currently being served by this region server. Key is the

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,10 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
575575
LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
576576
region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
577577
this.blockingWaitTime);
578-
if (!this.server.compactSplitThread.requestSplit(region)) {
578+
final CompactSplit compactSplitThread = server.getCompactSplitThread();
579+
if (!compactSplitThread.requestSplit(region)) {
579580
try {
580-
this.server.compactSplitThread.requestSystemCompaction(region,
581+
compactSplitThread.requestSystemCompaction(region,
581582
Thread.currentThread().getName());
582583
} catch (IOException e) {
583584
e = e instanceof RemoteException ?
@@ -624,16 +625,17 @@ private boolean flushRegion(HRegion region, boolean emergencyFlush,
624625

625626
tracker.beforeExecution();
626627
lock.readLock().lock();
628+
final CompactSplit compactSplitThread = server.getCompactSplitThread();
627629
try {
628630
notifyFlushRequest(region, emergencyFlush);
629631
FlushResult flushResult = region.flushcache(families, false, tracker);
630632
boolean shouldCompact = flushResult.isCompactionNeeded();
631633
// We just want to check the size
632634
boolean shouldSplit = region.checkSplit().isPresent();
633635
if (shouldSplit) {
634-
this.server.compactSplitThread.requestSplit(region);
636+
compactSplitThread.requestSplit(region);
635637
} else if (shouldCompact) {
636-
server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
638+
compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
637639
}
638640
} catch (DroppedSnapshotException ex) {
639641
// Cache flush can fail in a few places. If it fails in a critical

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -234,37 +234,26 @@ public long getTotalRowActionRequestCount() {
234234

235235
@Override
236236
public int getSplitQueueSize() {
237-
if (this.regionServer.compactSplitThread == null) {
238-
return 0;
239-
}
240-
return this.regionServer.compactSplitThread.getSplitQueueSize();
237+
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
238+
return compactSplit == null ? 0 : compactSplit.getSplitQueueSize();
241239
}
242240

243241
@Override
244242
public int getCompactionQueueSize() {
245-
//The thread could be zero. if so assume there is no queue.
246-
if (this.regionServer.compactSplitThread == null) {
247-
return 0;
248-
}
249-
return this.regionServer.compactSplitThread.getCompactionQueueSize();
243+
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
244+
return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize();
250245
}
251246

252247
@Override
253248
public int getSmallCompactionQueueSize() {
254-
//The thread could be zero. if so assume there is no queue.
255-
if (this.regionServer.compactSplitThread == null) {
256-
return 0;
257-
}
258-
return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
249+
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
250+
return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize();
259251
}
260252

261253
@Override
262254
public int getLargeCompactionQueueSize() {
263-
//The thread could be zero. if so assume there is no queue.
264-
if (this.regionServer.compactSplitThread == null) {
265-
return 0;
266-
}
267-
return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
255+
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
256+
return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize();
268257
}
269258

270259
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,19 +124,20 @@ public static void dumpRowLock(HRegionServer hrs, PrintWriter out) {
124124
}
125125
}
126126

127-
public static void dumpQueue(HRegionServer hrs, PrintWriter out)
128-
throws IOException {
129-
if (hrs.compactSplitThread != null) {
127+
public static void dumpQueue(HRegionServer hrs, PrintWriter out) {
128+
final CompactSplit compactSplit = hrs.getCompactSplitThread();
129+
if (compactSplit != null) {
130130
// 1. Print out Compaction/Split Queue
131-
out.println("Compaction/Split Queue summary: "
132-
+ hrs.compactSplitThread.toString() );
133-
out.println(hrs.compactSplitThread.dumpQueue());
131+
out.println("Compaction/Split Queue summary: " + compactSplit);
132+
out.println(compactSplit.dumpQueue());
134133
}
135134

136-
if (hrs.getMemStoreFlusher() != null) {
135+
final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher();
136+
if (memStoreFlusher != null) {
137137
// 2. Print out flush Queue
138-
out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString());
139-
out.println(hrs.getMemStoreFlusher().dumpQueue());
138+
out.println();
139+
out.println("Flush Queue summary: " + memStoreFlusher);
140+
out.println(memStoreFlusher.dumpQueue());
140141
}
141142
}
142143

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,17 +1707,18 @@ public CompactRegionResponse compactRegion(final RpcController controller,
17071707
@Override
17081708
public CompactionSwitchResponse compactionSwitch(RpcController controller,
17091709
CompactionSwitchRequest request) throws ServiceException {
1710+
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
17101711
try {
17111712
checkOpen();
17121713
requestCount.increment();
1713-
boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
1714+
boolean prevState = compactSplitThread.isCompactionsEnabled();
17141715
CompactionSwitchResponse response =
17151716
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
17161717
if (prevState == request.getEnabled()) {
17171718
// passed in requested state is same as current state. No action required
17181719
return response;
17191720
}
1720-
regionServer.compactSplitThread.switchCompaction(request.getEnabled());
1721+
compactSplitThread.switchCompaction(request.getEnabled());
17211722
return response;
17221723
} catch (IOException ie) {
17231724
throw new ServiceException(ie);
@@ -1760,7 +1761,7 @@ public FlushRegionResponse flushRegion(final RpcController controller,
17601761
}
17611762
boolean compactionNeeded = flushResult.isCompactionNeeded();
17621763
if (compactionNeeded) {
1763-
regionServer.compactSplitThread.requestSystemCompaction(region,
1764+
regionServer.getCompactSplitThread().requestSystemCompaction(region,
17641765
"Compaction through user triggered flush");
17651766
}
17661767
builder.setFlushed(flushResult.isFlushSucceeded());
@@ -1876,17 +1877,18 @@ public ClearCompactionQueuesResponse clearCompactionQueues(RpcController control
18761877
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
18771878
requestCount.increment();
18781879
if (clearCompactionQueues.compareAndSet(false,true)) {
1880+
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
18791881
try {
18801882
checkOpen();
18811883
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
18821884
for (String queueName : request.getQueueNameList()) {
18831885
LOG.debug("clear " + queueName + " compaction queue");
18841886
switch (queueName) {
18851887
case "long":
1886-
regionServer.compactSplitThread.clearLongCompactionsQueue();
1888+
compactSplitThread.clearLongCompactionsQueue();
18871889
break;
18881890
case "short":
1889-
regionServer.compactSplitThread.clearShortCompactionsQueue();
1891+
compactSplitThread.clearShortCompactionsQueue();
18901892
break;
18911893
default:
18921894
LOG.warn("Unknown queue name " + queueName);

hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestChangingEncoding.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,11 @@ private void compactAndWait() throws IOException, InterruptedException {
238238
final long maxWaitime = System.currentTimeMillis() + 500;
239239
boolean cont;
240240
do {
241-
cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
241+
cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
242242
Threads.sleep(1);
243243
} while (cont && System.currentTimeMillis() < maxWaitime);
244244

245-
while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
245+
while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
246246
Threads.sleep(1);
247247
}
248248
LOG.debug("Compaction queue size reached 0, continuing");

hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestLoadAndSwitchEncodeOnDisk.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void loadTest() throws Exception {
102102
// Wait until compaction completes
103103
Threads.sleepWithoutInterrupt(5000);
104104
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
105-
while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
105+
while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
106106
Threads.sleep(50);
107107
}
108108

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSplitThread.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,39 +119,39 @@ public void testThreadPoolSizeTuning() throws Exception {
119119
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
120120

121121
// check initial configuration of thread pool sizes
122-
assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
123-
assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
124-
assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
122+
assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
123+
assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
124+
assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());
125125

126126
// change bigger configurations and do online update
127127
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
128128
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
129129
conf.setInt(CompactSplit.SPLIT_THREADS, 6);
130130
try {
131-
regionServer.compactSplitThread.onConfigurationChange(conf);
131+
regionServer.getCompactSplitThread().onConfigurationChange(conf);
132132
} catch (IllegalArgumentException iae) {
133133
Assert.fail("Update bigger configuration failed!");
134134
}
135135

136136
// check again after online update
137-
assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
138-
assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
139-
assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
137+
assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
138+
assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
139+
assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());
140140

141141
// change smaller configurations and do online update
142142
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
143143
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
144144
conf.setInt(CompactSplit.SPLIT_THREADS, 4);
145145
try {
146-
regionServer.compactSplitThread.onConfigurationChange(conf);
146+
regionServer.getCompactSplitThread().onConfigurationChange(conf);
147147
} catch (IllegalArgumentException iae) {
148148
Assert.fail("Update smaller configuration failed!");
149149
}
150150

151151
// check again after online update
152-
assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
153-
assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
154-
assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
152+
assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
153+
assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
154+
assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
155155
} finally {
156156
conn.close();
157157
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerOnlineConfigChange.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotNull;
2223
import static org.junit.Assert.assertTrue;
2324

2425
import java.io.IOException;
@@ -91,26 +92,25 @@ public static void tearDown() throws Exception {
9192

9293
/**
9394
* Check if the number of compaction threads changes online
94-
* @throws IOException
9595
*/
9696
@Test
97-
public void testNumCompactionThreadsOnlineChange() throws IOException {
98-
assertTrue(rs1.compactSplitThread != null);
97+
public void testNumCompactionThreadsOnlineChange() {
98+
assertNotNull(rs1.getCompactSplitThread());
9999
int newNumSmallThreads =
100-
rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
100+
rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1;
101101
int newNumLargeThreads =
102-
rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
102+
rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1;
103103

104104
conf.setInt("hbase.regionserver.thread.compaction.small",
105-
newNumSmallThreads);
105+
newNumSmallThreads);
106106
conf.setInt("hbase.regionserver.thread.compaction.large",
107-
newNumLargeThreads);
107+
newNumLargeThreads);
108108
rs1.getConfigurationManager().notifyAllObservers(conf);
109109

110110
assertEquals(newNumSmallThreads,
111-
rs1.compactSplitThread.getSmallCompactionThreadNum());
111+
rs1.getCompactSplitThread().getSmallCompactionThreadNum());
112112
assertEquals(newNumLargeThreads,
113-
rs1.compactSplitThread.getLargeCompactionThreadNum());
113+
rs1.getCompactSplitThread().getLargeCompactionThreadNum());
114114
}
115115

116116
/**

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestCompactionWithThroughputController.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public void testThroughputTuning() throws Exception {
208208
TEST_UTIL.waitTableAvailable(tableName);
209209
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
210210
PressureAwareCompactionThroughputController throughputController =
211-
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread
211+
(PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
212212
.getCompactionThroughputController();
213213
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
214214
Table table = conn.getTable(tableName);
@@ -234,9 +234,9 @@ public void testThroughputTuning() throws Exception {
234234

235235
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
236236
NoLimitThroughputController.class.getName());
237-
regionServer.compactSplitThread.onConfigurationChange(conf);
237+
regionServer.getCompactSplitThread().onConfigurationChange(conf);
238238
assertTrue(throughputController.isStopped());
239-
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
239+
assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController()
240240
instanceof NoLimitThroughputController);
241241
} finally {
242242
conn.close();

0 commit comments

Comments
 (0)