diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java index bd78cc444c98..2eb697d4526e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java @@ -99,7 +99,8 @@ public class CompactionThreadManager implements ThroughputControllerService { this.server = server; try { this.rootDir = CommonFSUtils.getRootDir(this.conf); - this.tableDescriptors = new FSTableDescriptors(conf); + this.tableDescriptors = new FSTableDescriptors(CommonFSUtils.getCurrentFileSystem(conf), + CommonFSUtils.getRootDir(conf), true, false); int largeThreads = Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); @@ -213,7 +214,8 @@ Pair> selectCompaction(RegionInfo regionInfo tableDescriptors.get(regionInfo.getTable()); HStore store = getStore(conf, server.getFileSystem(), rootDir, tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString()); - + // handle TTL case + store.removeUnneededFiles(false); // CompactedHFilesDischarger only run on regionserver, so compactionserver does not have // opportunity to clean compacted file at that time, we clean compacted files here compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 5cd005af1a60..0569ad3c7adf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1928,7 +1928,7 @@ public Optional requestCompaction(int priority, return Optional.empty(); } // Before we do compaction, try to get rid of unneeded files to simplify things. - removeUnneededFiles(); + removeUnneededFiles(true); if (region.getRegionServerServices() != null && region.getRegionServerServices().isCompactionOffloadEnabled() @@ -2063,7 +2063,7 @@ private void addToCompactingFiles(Collection filesToAdd) { Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } - private void removeUnneededFiles() throws IOException { + public void removeUnneededFiles(boolean writeWalRecord) throws IOException { if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { return; } @@ -2092,7 +2092,9 @@ private void removeUnneededFiles() throws IOException { } Collection newFiles = Collections.emptyList(); // No new files. - writeCompactionWalRecord(delSfs, newFiles); + if (writeWalRecord) { + writeCompactionWalRecord(delSfs, newFiles); + } replaceStoreFiles(delSfs, newFiles); refreshStoreSizeAndTotalBytes(); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java index 6d0ccccce4d7..18b07d92bf09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestCompactionServer.java @@ -304,4 +304,50 @@ public void testCompactionOffloadTableDescriptor() throws Exception { TEST_UTIL.compact(TABLENAME, false); TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0); } + + @Test + public void testCompactionWithTTL() throws Exception { + TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>()); + ColumnFamilyDescriptor cfd = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setTimeToLive(10).build(); + TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME) + .setColumnFamily(cfd).setCompactionOffloadEnabled(true).build(); + TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor); + TEST_UTIL.waitTableAvailable(TABLENAME); + // generate hfile all kv are expired + doPutRecord(1, 500, true); + int kVCount = 0; + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) { + for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) { + kVCount += hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount(); + } + } + assertEquals(500, kVCount); + // generate hfile mixed with expired and valid KV + doPutRecord(1, 500, false); + Thread.sleep(10000); + doPutRecord(501, 1000, true); + + TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>()); + TEST_UTIL.compact(TABLENAME, true); + TEST_UTIL.waitFor(60000, () -> { + int hFileCount = 0; + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) { + hFileCount += region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount(); + } + return hFileCount == 1; + }); + kVCount = 0; + for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) { + for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) { + kVCount += hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount(); + } + } + // To ensure do compaction on compaction server + TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0); + + assertEquals(500, kVCount); + verifyRecord(1,500, false); + verifyRecord(501,1000, true); + } }