Skip to content

Commit 6c95c36

Browse files
committed
HBASE-26320 Implement a separate thread pool for the LogCleaner
This avoids starvation when the archive directory is large and takes a long time to iterate through.
1 parent 0294c73 commit 6c95c36

File tree

12 files changed

+113
-30
lines changed

12 files changed

+113
-30
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ public class HMaster extends HRegionServer implements MasterServices {
352352

353353
private HbckChore hbckChore;
354354
CatalogJanitor catalogJanitorChore;
355-
private DirScanPool cleanerPool;
355+
private DirScanPool archiveCleanerPool;
356+
private DirScanPool logCleanerPool;
356357
private LogCleaner logCleaner;
357358
private HFileCleaner hfileCleaner;
358359
private ReplicationBarrierCleaner replicationBarrierCleaner;
@@ -1068,7 +1069,8 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
10681069
(EnvironmentEdgeManager.currentTime() - masterActiveTime) / 1000.0f));
10691070
this.masterFinishedInitializationTime = EnvironmentEdgeManager.currentTime();
10701071
configurationManager.registerObserver(this.balancer);
1071-
configurationManager.registerObserver(this.cleanerPool);
1072+
configurationManager.registerObserver(this.archiveCleanerPool);
1073+
configurationManager.registerObserver(this.logCleanerPool);
10721074
configurationManager.registerObserver(this.hfileCleaner);
10731075
configurationManager.registerObserver(this.logCleaner);
10741076
configurationManager.registerObserver(this.regionsRecoveryConfigManager);
@@ -1430,21 +1432,23 @@ private void startServiceThreads() throws IOException {
14301432
ExecutorType.MASTER_TABLE_OPERATIONS).setCorePoolSize(1));
14311433
startProcedureExecutor();
14321434

1433-
// Create cleaner thread pool
1434-
cleanerPool = new DirScanPool(conf);
1435+
// Create log cleaner thread pool
1436+
logCleanerPool = DirScanPool.getOldLogsScanner(conf);
14351437
Map<String, Object> params = new HashMap<>();
14361438
params.put(MASTER, this);
14371439
// Start log cleaner thread
14381440
int cleanerInterval =
14391441
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
14401442
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1441-
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool, params);
1443+
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), logCleanerPool, params);
14421444
getChoreService().scheduleChore(logCleaner);
14431445

14441446
// start the hfile archive cleaner thread
14451447
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1448+
// Create archive cleaner thread pool
1449+
archiveCleanerPool = DirScanPool.getArchiveScanner(conf);
14461450
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1447-
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
1451+
getMasterFileSystem().getFileSystem(), archiveDir, archiveCleanerPool, params);
14481452
getChoreService().scheduleChore(hfileCleaner);
14491453

14501454
// Regions Reopen based on very high storeFileRefCount is considered enabled
@@ -1496,9 +1500,13 @@ protected void stopServiceThreads() {
14961500
stopChores();
14971501

14981502
super.stopServiceThreads();
1499-
if (cleanerPool != null) {
1500-
cleanerPool.shutdownNow();
1501-
cleanerPool = null;
1503+
if (archiveCleanerPool != null) {
1504+
archiveCleanerPool.shutdownNow();
1505+
archiveCleanerPool = null;
1506+
}
1507+
if (logCleanerPool != null) {
1508+
logCleanerPool.shutdownNow();
1509+
logCleanerPool = null;
15021510
}
15031511

15041512
LOG.debug("Stopping service threads");

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
6464
*/
6565
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
6666
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
67+
public static final String LOG_CLEANER_CHORE_SIZE = "hbase.log.cleaner.scan.dir.concurrent.size";
68+
static final String DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE = "1";
6769

6870
private final DirScanPool pool;
6971

hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/DirScanPool.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,28 @@ public class DirScanPool implements ConfigurationObserver {
3939
private final ThreadPoolExecutor pool;
4040
private int cleanerLatch;
4141
private boolean reconfigNotification;
42+
private final String cleanerPoolConfig;
43+
private final String cleanerPoolConfigDefault;
44+
private final String name;
4245

43-
public DirScanPool(Configuration conf) {
44-
String poolSize = conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE);
46+
private DirScanPool(Configuration conf, String cleanerPoolConfig, String cleanerPoolConfigDefault,
47+
String name) {
48+
this.cleanerPoolConfig = cleanerPoolConfig;
49+
this.cleanerPoolConfigDefault = cleanerPoolConfigDefault;
50+
this.name = name;
51+
String poolSize = conf.get(cleanerPoolConfig, cleanerPoolConfigDefault);
4552
size = CleanerChore.calculatePoolSize(poolSize);
4653
// poolSize may be 0 or 0.0 from a careless configuration,
4754
// double check to make sure.
48-
size = size == 0 ? CleanerChore.calculatePoolSize(CleanerChore.DEFAULT_CHORE_POOL_SIZE) : size;
49-
pool = initializePool(size);
50-
LOG.info("Cleaner pool size is {}", size);
55+
size = size == 0 ? CleanerChore.calculatePoolSize(cleanerPoolConfigDefault) : size;
56+
pool = initializePool(size, name);
57+
LOG.info("{} Cleaner pool size is {}", name, size);
5158
cleanerLatch = 0;
5259
}
5360

54-
private static ThreadPoolExecutor initializePool(int size) {
61+
private static ThreadPoolExecutor initializePool(int size, String name) {
5562
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
56-
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
63+
new ThreadFactoryBuilder().setNameFormat(name + "dir-scan-pool-%d").setDaemon(true)
5764
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
5865
}
5966

@@ -64,9 +71,10 @@ private static ThreadPoolExecutor initializePool(int size) {
6471
@Override
6572
public synchronized void onConfigurationChange(Configuration conf) {
6673
int newSize = CleanerChore.calculatePoolSize(
67-
conf.get(CleanerChore.CHORE_POOL_SIZE, CleanerChore.DEFAULT_CHORE_POOL_SIZE));
74+
conf.get(cleanerPoolConfig, cleanerPoolConfigDefault));
6875
if (newSize == size) {
69-
LOG.trace("Size from configuration is same as previous={}, no need to update.", newSize);
76+
LOG.trace("{} Cleaner Size from configuration is same as previous={}, no need to update.",
77+
name, newSize);
7078
return;
7179
}
7280
size = newSize;
@@ -109,11 +117,21 @@ synchronized void tryUpdatePoolSize(long timeout) {
109117
break;
110118
}
111119
}
112-
LOG.info("Update chore's pool size from {} to {}", pool.getPoolSize(), size);
120+
LOG.info("Update {} chore's pool size from {} to {}", name, pool.getPoolSize(), size);
113121
pool.setCorePoolSize(size);
114122
}
115123

116124
public int getSize() {
117125
return size;
118126
}
127+
128+
public static DirScanPool getArchiveScanner(Configuration conf) {
129+
return new DirScanPool(conf, CleanerChore.CHORE_POOL_SIZE,
130+
CleanerChore.DEFAULT_CHORE_POOL_SIZE, "Archive");
131+
}
132+
133+
public static DirScanPool getOldLogsScanner(Configuration conf) {
134+
return new DirScanPool(conf, CleanerChore.LOG_CLEANER_CHORE_SIZE,
135+
CleanerChore.DEFAULT_LOG_CLEANER_CHORE_POOL_SIZE, "Log");
136+
}
119137
}

hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static void setupCluster() throws Exception {
111111
// We don't want the cleaner to remove files. The tests do that.
112112
UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().cancel(true);
113113

114-
POOL = new DirScanPool(UTIL.getConfiguration());
114+
POOL = DirScanPool.getArchiveScanner(UTIL.getConfiguration());
115115
}
116116

117117
private static void setupConf(Configuration conf) {

hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public static void setupCluster() throws Exception {
119119
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
120120
ZKUtil.createWithParents(watcher, archivingZNode);
121121
rss = mock(RegionServerServices.class);
122-
POOL = new DirScanPool(UTIL.getConfiguration());
122+
POOL= DirScanPool.getArchiveScanner(UTIL.getConfiguration());
123123
}
124124

125125
private static void setupConf(Configuration conf) {

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class TestCleanerChore {
6161

6262
@BeforeClass
6363
public static void setup() {
64-
POOL = new DirScanPool(UTIL.getConfiguration());
64+
POOL = DirScanPool.getArchiveScanner(UTIL.getConfiguration());
6565
}
6666

6767
@AfterClass
@@ -469,6 +469,57 @@ public void testOnConfigurationChange() throws Exception {
469469
t.join();
470470
}
471471

472+
@Test
473+
public void testOnConfigurationChangeLogCleaner() throws Exception {
474+
int availableProcessorNum = Runtime.getRuntime().availableProcessors();
475+
if (availableProcessorNum == 1) { // no need to run this test
476+
return;
477+
}
478+
479+
DirScanPool pool = DirScanPool.getOldLogsScanner(UTIL.getConfiguration());
480+
481+
// have at least 2 available processors/cores
482+
int initPoolSize = availableProcessorNum / 2;
483+
int changedPoolSize = availableProcessorNum;
484+
485+
Stoppable stop = new StoppableImplementation();
486+
Configuration conf = UTIL.getConfiguration();
487+
Path testDir = UTIL.getDataTestDir();
488+
FileSystem fs = UTIL.getTestFileSystem();
489+
String confKey = "hbase.test.cleaner.delegates";
490+
conf.set(confKey, AlwaysDelete.class.getName());
491+
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(initPoolSize));
492+
final AllValidPaths chore =
493+
new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir, confKey, pool);
494+
chore.setEnabled(true);
495+
// Create subdirs under testDir
496+
int dirNums = 6;
497+
Path[] subdirs = new Path[dirNums];
498+
for (int i = 0; i < dirNums; i++) {
499+
subdirs[i] = new Path(testDir, "subdir-" + i);
500+
fs.mkdirs(subdirs[i]);
501+
}
502+
// Under each subdirs create 6 files
503+
for (Path subdir : subdirs) {
504+
createFiles(fs, subdir, 6);
505+
}
506+
// Start chore
507+
Thread t = new Thread(new Runnable() {
508+
@Override
509+
public void run() {
510+
chore.chore();
511+
}
512+
});
513+
t.setDaemon(true);
514+
t.start();
515+
// Change size of chore's pool
516+
conf.set(CleanerChore.LOG_CLEANER_CHORE_SIZE, String.valueOf(changedPoolSize));
517+
pool.onConfigurationChange(conf);
518+
assertEquals(changedPoolSize, chore.getChorePoolSize());
519+
// Stop chore
520+
t.join();
521+
}
522+
472523
@Test
473524
public void testMinimumNumberOfThreads() throws Exception {
474525
Configuration conf = UTIL.getConfiguration();

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class TestHFileCleaner {
7171
public static void setupCluster() throws Exception {
7272
// have to use a minidfs cluster because the localfs doesn't modify file times correctly
7373
UTIL.startMiniDFSCluster(1);
74-
POOL = new DirScanPool(UTIL.getConfiguration());
74+
POOL = DirScanPool.getArchiveScanner(UTIL.getConfiguration());
7575
}
7676

7777
@AfterClass

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class TestHFileLinkCleaner {
6767

6868
@BeforeClass
6969
public static void setUp() {
70-
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
70+
POOL = DirScanPool.getArchiveScanner(TEST_UTIL.getConfiguration());
7171
}
7272

7373
@AfterClass

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class TestLogsCleaner {
9494
public static void setUpBeforeClass() throws Exception {
9595
TEST_UTIL.startMiniZKCluster();
9696
TEST_UTIL.startMiniDFSCluster(1);
97-
POOL = new DirScanPool(TEST_UTIL.getConfiguration());
97+
POOL = DirScanPool.getOldLogsScanner(TEST_UTIL.getConfiguration());
9898
}
9999

100100
@AfterClass

hbase-server/src/test/java/org/apache/hadoop/hbase/master/region/MasterRegionTestBase.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public class MasterRegionTestBase {
4848

4949
protected ChoreService choreService;
5050

51-
protected DirScanPool cleanerPool;
51+
protected DirScanPool archiveCleanerPool;
52+
53+
protected DirScanPool logCleanerPool;
5254

5355
protected static byte[] CF1 = Bytes.toBytes("f1");
5456

@@ -80,7 +82,8 @@ public void setUp() throws IOException {
8082
htu.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
8183
configure(htu.getConfiguration());
8284
choreService = new ChoreService(getClass().getSimpleName());
83-
cleanerPool = new DirScanPool(htu.getConfiguration());
85+
archiveCleanerPool = DirScanPool.getArchiveScanner(htu.getConfiguration());
86+
logCleanerPool = DirScanPool.getOldLogsScanner(htu.getConfiguration());
8487
Server server = mock(Server.class);
8588
when(server.getConfiguration()).thenReturn(htu.getConfiguration());
8689
when(server.getServerName())
@@ -103,7 +106,8 @@ public void setUp() throws IOException {
103106
@After
104107
public void tearDown() throws IOException {
105108
region.close(true);
106-
cleanerPool.shutdownNow();
109+
archiveCleanerPool.shutdownNow();
110+
logCleanerPool.shutdownNow();
107111
choreService.shutdown();
108112
htu.cleanupTestDir();
109113
}

0 commit comments

Comments
 (0)