diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 0a1a9e5aa7ea..75e28112fe5c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -174,11 +175,12 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel // we have to use a do/while loop. List cells = new ArrayList<>(); // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - int closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long currentTime = EnvironmentEdgeManager.currentTime(); long lastMillis = 0; if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); + lastMillis = currentTime; } + CloseChecker closeChecker = new CloseChecker(conf, currentTime); String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; @@ -218,8 +220,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel } do { hasMore = scanner.next(cells, scannerContext); + currentTime = EnvironmentEdgeManager.currentTime(); if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTime(); + now = currentTime; + } + if (closeChecker.isTimeLimit(store, currentTime)) { + progress.cancel(); + return false; } for (Cell c : cells) { if (major && CellUtil.isDelete(c)) { @@ -292,16 +299,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel bytesWrittenProgressForLog += len; } throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckSizeLimit > 0) { - bytesWrittenProgressForCloseCheck += len; - if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { - bytesWrittenProgressForCloseCheck = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } - } + if (closeChecker.isSizeLimit(store, len)) { + progress.cancel(); + return false; } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { ((ShipperListener)writer).beforeShipped(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6628328a8839..a0c911dae99b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2336,15 +2336,13 @@ public boolean compact(CompactionContext compaction, HStore store, * } * Also in compactor.performCompaction(): * check periodically to see if a system stop is requested - * if (closeCheckInterval > 0) { - * bytesWritten += len; - * if (bytesWritten > closeCheckInterval) { - * bytesWritten = 0; - * if (!store.areWritesEnabled()) { - * progress.cancel(); - * return false; - * } - * } + * if (closeChecker != null && closeChecker.isTimeLimit(store, now)) { + * progress.cancel(); + * return false; + * } + * if (closeChecker != null && closeChecker.isSizeLimit(store, len)) { + * progress.cancel(); + * return false; * } */ try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4e1ef4cd89d2..1722328a5cc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -157,8 +157,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, protected Configuration conf; private long lastCompactSize = 0; volatile boolean forceMajor = false; - /* how many bytes to write between status checks */ - static int closeCheckInterval = 0; private AtomicLong storeSize = new AtomicLong(); private AtomicLong totalUncompressedBytes = new AtomicLong(); private LongAdder memstoreOnlyRowReadsCount = new LongAdder(); @@ -291,11 +289,6 @@ protected HStore(final HRegion region, final ColumnFamilyDescriptor family, this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; } - if (HStore.closeCheckInterval == 0) { - HStore.closeCheckInterval = conf.getInt( - "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); - } - this.storeEngine = createStoreEngine(this, this.conf, region.getCellComparator()); List hStoreFiles = loadStoreFiles(warmup); // Move the storeSize calculation out of loadStoreFiles() method, because the secondary read @@ -490,13 +483,6 @@ public long getBlockingFileCount() { /* End implementation of StoreConfigInformation */ - /** - * @return how many bytes to write between status checks - */ - public static int getCloseCheckInterval() { - return closeCheckInterval; - } - @Override public ColumnFamilyDescriptor getColumnFamilyDescriptor() { return this.storeContext.getFamily(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java new file mode 100644 index 000000000000..ea711c037729 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CloseChecker.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Check periodically to see if a system stop is requested + */ +@InterfaceAudience.Private +public class CloseChecker { + public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval"; + public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval"; + + private final int closeCheckSizeLimit; + private final long closeCheckTimeLimit; + + private long bytesWrittenProgressForCloseCheck; + private long lastCloseCheckMillis; + + public CloseChecker(Configuration conf, long currentTime) { + this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); + this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); + this.bytesWrittenProgressForCloseCheck = 0; + this.lastCloseCheckMillis = currentTime; + } + + /** + * Check periodically to see if a system stop is requested every written bytes reach size limit. + * + * @return if true, system stop. + */ + public boolean isSizeLimit(Store store, long bytesWritten) { + if (closeCheckSizeLimit <= 0) { + return false; + } + + bytesWrittenProgressForCloseCheck += bytesWritten; + if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) { + return false; + } + + bytesWrittenProgressForCloseCheck = 0; + return !store.areWritesEnabled(); + } + + /** + * Check periodically to see if a system stop is requested every time. + * + * @return if true, system stop. + */ + public boolean isTimeLimit(Store store, long now) { + if (closeCheckTimeLimit <= 0) { + return false; + } + + final long elapsedMillis = now - lastCloseCheckMillis; + if (elapsedMillis <= closeCheckTimeLimit) { + return false; + } + + lastCloseCheckMillis = now; + return !store.areWritesEnabled(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index fdfb512de524..7f70e0230ff5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -393,17 +393,17 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { assert writer instanceof ShipperListener; - long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List cells = new ArrayList<>(); - long closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long currentTime = EnvironmentEdgeManager.currentTime(); long lastMillis = 0; if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); + lastMillis = currentTime; } + CloseChecker closeChecker = new CloseChecker(conf, currentTime); String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); long now = 0; boolean hasMore; @@ -417,8 +417,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel try { do { hasMore = scanner.next(cells, scannerContext); + currentTime = EnvironmentEdgeManager.currentTime(); if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTime(); + now = currentTime; + } + if (closeChecker.isTimeLimit(store, currentTime)) { + progress.cancel(); + return false; } // output to writer: Cell lastCleanCell = null; @@ -441,16 +446,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel bytesWrittenProgressForLog += len; } throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckSizeLimit > 0) { - bytesWrittenProgressForCloseCheck += len; - if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { - bytesWrittenProgressForCloseCheck = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } - } + if (closeChecker.isSizeLimit(store, len)) { + progress.cancel(); + return false; } } if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 0a57de082b69..bc894ae2920d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES; import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -156,12 +158,11 @@ public void tearDown() throws Exception { * (used during RS shutdown) */ @Test - public void testInterruptCompaction() throws Exception { + public void testInterruptCompactionBySize() throws Exception { assertEquals(0, count()); // lower the polling interval for this test - int origWI = HStore.closeCheckInterval; - HStore.closeCheckInterval = 10*1000; // 10 KB + conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */); try { // Create a couple store files w/ 15KB (over 10KB interval) @@ -206,7 +207,84 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } finally { // don't mess up future tests r.writestate.writesEnabled = true; - HStore.closeCheckInterval = origWI; + conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */); + + // Delete all Store information once done using + for (int i = 0; i < compactionThreshold; i++) { + Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i))); + byte [][] famAndQf = {COLUMN_FAMILY, null}; + delete.addFamily(famAndQf[0]); + r.delete(delete); + } + r.flush(true); + + // Multiple versions allowed for an entry, so the delete isn't enough + // Lower TTL and expire to ensure that all our entries have been wiped + final int ttl = 1000; + for (HStore store : this.r.stores.values()) { + ScanInfo old = store.getScanInfo(); + ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells()); + store.setScanInfo(si); + } + Thread.sleep(ttl); + + r.compact(true); + assertEquals(0, count()); + } + } + + @Test + public void testInterruptCompactionByTime() throws Exception { + assertEquals(0, count()); + + // lower the polling interval for this test + conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */); + + try { + // Create a couple store files w/ 15KB (over 10KB interval) + int jmax = (int) Math.ceil(15.0/compactionThreshold); + byte [] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Table loader = new RegionAsTable(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + r.flush(true); + } + + HRegion spyR = spy(r); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + r.writestate.writesEnabled = false; + return invocation.callRealMethod(); + } + }).when(spyR).doRegionCompactionPrep(); + + // force a minor compaction, but not before requesting a stop + spyR.compactStores(); + + // ensure that the compaction stopped, all old files are intact, + HStore s = r.getStore(COLUMN_FAMILY); + assertEquals(compactionThreshold, s.getStorefilesCount()); + assertTrue(s.getStorefilesSize() > 15*1000); + // and no new store files persisted past compactStores() + // only one empty dir exists in temp dir + FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir()); + assertEquals(1, ls.length); + Path storeTempDir = + new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY)); + assertTrue(r.getFilesystem().exists(storeTempDir)); + ls = r.getFilesystem().listStatus(storeTempDir); + assertEquals(0, ls.length); + } finally { + // don't mess up future tests + r.writestate.writesEnabled = true; + conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */); // Delete all Store information once done using for (int i = 0; i < compactionThreshold; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java new file mode 100644 index 000000000000..ef42c19a2a69 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCloseChecker.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestCloseChecker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCloseChecker.class); + + @Test + public void testIsClosed() { + Store enableWrite = mock(Store.class); + when(enableWrite.areWritesEnabled()).thenReturn(true); + + Store disableWrite = mock(Store.class); + when(disableWrite.areWritesEnabled()).thenReturn(false); + + Configuration conf = new Configuration(); + + long currentTime = System.currentTimeMillis(); + + conf.setInt(SIZE_LIMIT_KEY, 10); + conf.setLong(TIME_LIMIT_KEY, 10); + + CloseChecker closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 10L)); + + closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + 11)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 11L)); + + closeChecker = new CloseChecker(conf, currentTime); + assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 11)); + assertTrue(closeChecker.isSizeLimit(disableWrite, 11L)); + + for (int i = 0; i < 10; i++) { + int plusTime = 5 * i; + assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + plusTime)); + assertFalse(closeChecker.isSizeLimit(enableWrite, 5L)); + } + + closeChecker = new CloseChecker(conf, currentTime); + assertFalse(closeChecker.isTimeLimit(disableWrite, currentTime + 6)); + assertFalse(closeChecker.isSizeLimit(disableWrite, 6)); + assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 12)); + assertTrue(closeChecker.isSizeLimit(disableWrite, 6)); + } +} \ No newline at end of file