Skip to content

Commit 27ed4c9

Browse files
committed
HBASE-24337 Backport HBASE-23968 to branch-2
1 parent 89ae3c5 commit 27ed4c9

7 files changed

Lines changed: 274 additions & 54 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
4242
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
4343
import org.apache.hadoop.hbase.regionserver.StoreScanner;
44+
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
4445
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
4546
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
4647
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@@ -173,11 +174,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
173174
// we have to use a do/while loop.
174175
List<Cell> cells = new ArrayList<>();
175176
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
176-
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
177+
long currentTime = EnvironmentEdgeManager.currentTime();
177178
long lastMillis = 0;
178179
if (LOG.isDebugEnabled()) {
179-
lastMillis = EnvironmentEdgeManager.currentTime();
180+
lastMillis = currentTime;
180181
}
182+
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
181183
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
182184
long now = 0;
183185
boolean hasMore;
@@ -216,8 +218,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
216218
}
217219
do {
218220
hasMore = scanner.next(cells, scannerContext);
221+
currentTime = EnvironmentEdgeManager.currentTime();
219222
if (LOG.isDebugEnabled()) {
220-
now = EnvironmentEdgeManager.currentTime();
223+
now = currentTime;
224+
}
225+
if (closeChecker.isTimeLimit(store, currentTime)) {
226+
progress.cancel();
227+
return false;
221228
}
222229
for (Cell c : cells) {
223230
if (major && CellUtil.isDelete(c)) {
@@ -290,16 +297,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
290297
bytesWrittenProgressForLog += len;
291298
}
292299
throughputController.control(compactionName, len);
293-
// check periodically to see if a system stop is requested
294-
if (closeCheckSizeLimit > 0) {
295-
bytesWrittenProgressForCloseCheck += len;
296-
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
297-
bytesWrittenProgressForCloseCheck = 0;
298-
if (!store.areWritesEnabled()) {
299-
progress.cancel();
300-
return false;
301-
}
302-
}
300+
if (closeChecker.isSizeLimit(store, len)) {
301+
progress.cancel();
302+
return false;
303303
}
304304
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
305305
((ShipperListener)writer).beforeShipped();

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2206,15 +2206,13 @@ public boolean compact(CompactionContext compaction, HStore store,
22062206
* }
22072207
* Also in compactor.performCompaction():
22082208
* check periodically to see if a system stop is requested
2209-
* if (closeCheckInterval > 0) {
2210-
* bytesWritten += len;
2211-
* if (bytesWritten > closeCheckInterval) {
2212-
* bytesWritten = 0;
2213-
* if (!store.areWritesEnabled()) {
2214-
* progress.cancel();
2215-
* return false;
2216-
* }
2217-
* }
2209+
* if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
2210+
* progress.cancel();
2211+
* return false;
2212+
* }
2213+
* if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
2214+
* progress.cancel();
2215+
* return false;
22182216
* }
22192217
*/
22202218
try {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
154154
protected CacheConfig cacheConf;
155155
private long lastCompactSize = 0;
156156
volatile boolean forceMajor = false;
157-
/* how many bytes to write between status checks */
158-
static int closeCheckInterval = 0;
159157
private AtomicLong storeSize = new AtomicLong();
160158
private AtomicLong totalUncompressedBytes = new AtomicLong();
161159

@@ -297,11 +295,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family,
297295
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
298296
}
299297

300-
if (HStore.closeCheckInterval == 0) {
301-
HStore.closeCheckInterval = conf.getInt(
302-
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
303-
}
304-
305298
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
306299
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
307300
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
@@ -490,13 +483,6 @@ public static ChecksumType getChecksumType(Configuration conf) {
490483
}
491484
}
492485

493-
/**
494-
* @return how many bytes to write between status checks
495-
*/
496-
public static int getCloseCheckInterval() {
497-
return closeCheckInterval;
498-
}
499-
500486
@Override
501487
public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
502488
return this.family;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.hadoop.hbase.regionserver.compactions;
17+
18+
import org.apache.hadoop.conf.Configuration;
19+
import org.apache.hadoop.hbase.regionserver.Store;
20+
import org.apache.yetus.audience.InterfaceAudience;
21+
22+
/**
23+
* Check periodically to see if a system stop is requested
24+
*/
25+
@InterfaceAudience.Private
26+
public class CloseChecker {
27+
public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval";
28+
public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval";
29+
30+
private final int closeCheckSizeLimit;
31+
private final long closeCheckTimeLimit;
32+
33+
private long bytesWrittenProgressForCloseCheck;
34+
private long lastCloseCheckMillis;
35+
36+
public CloseChecker(Configuration conf, long currentTime) {
37+
this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
38+
this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
39+
this.bytesWrittenProgressForCloseCheck = 0;
40+
this.lastCloseCheckMillis = currentTime;
41+
}
42+
43+
/**
44+
* Check periodically to see if a system stop is requested every written bytes reach size limit.
45+
*
46+
* @return if true, system stop.
47+
*/
48+
public boolean isSizeLimit(Store store, long bytesWritten) {
49+
if (closeCheckSizeLimit <= 0) {
50+
return false;
51+
}
52+
53+
bytesWrittenProgressForCloseCheck += bytesWritten;
54+
if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) {
55+
return false;
56+
}
57+
58+
bytesWrittenProgressForCloseCheck = 0;
59+
return !store.areWritesEnabled();
60+
}
61+
62+
/**
63+
* Check periodically to see if a system stop is requested every time.
64+
*
65+
* @return if true, system stop.
66+
*/
67+
public boolean isTimeLimit(Store store, long now) {
68+
if (closeCheckTimeLimit <= 0) {
69+
return false;
70+
}
71+
72+
final long elapsedMillis = now - lastCloseCheckMillis;
73+
if (elapsedMillis <= closeCheckTimeLimit) {
74+
return false;
75+
}
76+
77+
lastCloseCheckMillis = now;
78+
return !store.areWritesEnabled();
79+
}
80+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,17 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
368368
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
369369
boolean major, int numofFilesToCompact) throws IOException {
370370
assert writer instanceof ShipperListener;
371-
long bytesWrittenProgressForCloseCheck = 0;
372371
long bytesWrittenProgressForLog = 0;
373372
long bytesWrittenProgressForShippedCall = 0;
374373
// Since scanner.next() can return 'false' but still be delivering data,
375374
// we have to use a do/while loop.
376375
List<Cell> cells = new ArrayList<>();
377-
long closeCheckSizeLimit = HStore.getCloseCheckInterval();
376+
long currentTime = EnvironmentEdgeManager.currentTime();
378377
long lastMillis = 0;
379378
if (LOG.isDebugEnabled()) {
380-
lastMillis = EnvironmentEdgeManager.currentTime();
379+
lastMillis = currentTime;
381380
}
381+
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
382382
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
383383
long now = 0;
384384
boolean hasMore;
@@ -392,8 +392,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
392392
try {
393393
do {
394394
hasMore = scanner.next(cells, scannerContext);
395+
currentTime = EnvironmentEdgeManager.currentTime();
395396
if (LOG.isDebugEnabled()) {
396-
now = EnvironmentEdgeManager.currentTime();
397+
now = currentTime;
398+
}
399+
if (closeChecker.isTimeLimit(store, currentTime)) {
400+
progress.cancel();
401+
return false;
397402
}
398403
// output to writer:
399404
Cell lastCleanCell = null;
@@ -416,16 +421,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
416421
bytesWrittenProgressForLog += len;
417422
}
418423
throughputController.control(compactionName, len);
419-
// check periodically to see if a system stop is requested
420-
if (closeCheckSizeLimit > 0) {
421-
bytesWrittenProgressForCloseCheck += len;
422-
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
423-
bytesWrittenProgressForCloseCheck = 0;
424-
if (!store.areWritesEnabled()) {
425-
progress.cancel();
426-
return false;
427-
}
428-
}
424+
if (closeChecker.isSizeLimit(store, len)) {
425+
progress.cancel();
426+
return false;
429427
}
430428
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
431429
if (lastCleanCell != null) {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
2222
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
2323
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
24+
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
25+
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
2426
import static org.junit.Assert.assertEquals;
2527
import static org.junit.Assert.assertFalse;
2628
import static org.junit.Assert.assertTrue;
@@ -152,12 +154,11 @@ public void tearDown() throws Exception {
152154
* @throws Exception
153155
*/
154156
@Test
155-
public void testInterruptCompaction() throws Exception {
157+
public void testInterruptCompactionBySize() throws Exception {
156158
assertEquals(0, count());
157159

158160
// lower the polling interval for this test
159-
int origWI = HStore.closeCheckInterval;
160-
HStore.closeCheckInterval = 10*1000; // 10 KB
161+
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
161162

162163
try {
163164
// Create a couple store files w/ 15KB (over 10KB interval)
@@ -202,7 +203,84 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
202203
} finally {
203204
// don't mess up future tests
204205
r.writestate.writesEnabled = true;
205-
HStore.closeCheckInterval = origWI;
206+
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
207+
208+
// Delete all Store information once done using
209+
for (int i = 0; i < compactionThreshold; i++) {
210+
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
211+
byte [][] famAndQf = {COLUMN_FAMILY, null};
212+
delete.addFamily(famAndQf[0]);
213+
r.delete(delete);
214+
}
215+
r.flush(true);
216+
217+
// Multiple versions allowed for an entry, so the delete isn't enough
218+
// Lower TTL and expire to ensure that all our entries have been wiped
219+
final int ttl = 1000;
220+
for (HStore store : this.r.stores.values()) {
221+
ScanInfo old = store.getScanInfo();
222+
ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
223+
store.setScanInfo(si);
224+
}
225+
Thread.sleep(ttl);
226+
227+
r.compact(true);
228+
assertEquals(0, count());
229+
}
230+
}
231+
232+
@Test
233+
public void testInterruptCompactionByTime() throws Exception {
234+
assertEquals(0, count());
235+
236+
// lower the polling interval for this test
237+
conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
238+
239+
try {
240+
// Create a couple store files w/ 15KB (over 10KB interval)
241+
int jmax = (int) Math.ceil(15.0/compactionThreshold);
242+
byte [] pad = new byte[1000]; // 1 KB chunk
243+
for (int i = 0; i < compactionThreshold; i++) {
244+
Table loader = new RegionAsTable(r);
245+
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
246+
p.setDurability(Durability.SKIP_WAL);
247+
for (int j = 0; j < jmax; j++) {
248+
p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
249+
}
250+
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
251+
loader.put(p);
252+
r.flush(true);
253+
}
254+
255+
HRegion spyR = spy(r);
256+
doAnswer(new Answer() {
257+
@Override
258+
public Object answer(InvocationOnMock invocation) throws Throwable {
259+
r.writestate.writesEnabled = false;
260+
return invocation.callRealMethod();
261+
}
262+
}).when(spyR).doRegionCompactionPrep();
263+
264+
// force a minor compaction, but not before requesting a stop
265+
spyR.compactStores();
266+
267+
// ensure that the compaction stopped, all old files are intact,
268+
HStore s = r.getStore(COLUMN_FAMILY);
269+
assertEquals(compactionThreshold, s.getStorefilesCount());
270+
assertTrue(s.getStorefilesSize() > 15*1000);
271+
// and no new store files persisted past compactStores()
272+
// only one empty dir exists in temp dir
273+
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
274+
assertEquals(1, ls.length);
275+
Path storeTempDir =
276+
new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
277+
assertTrue(r.getFilesystem().exists(storeTempDir));
278+
ls = r.getFilesystem().listStatus(storeTempDir);
279+
assertEquals(0, ls.length);
280+
} finally {
281+
// don't mess up future tests
282+
r.writestate.writesEnabled = true;
283+
conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
206284

207285
// Delete all Store information once done using
208286
for (int i = 0; i < compactionThreshold; i++) {

0 commit comments

Comments
 (0)