Skip to content

Commit f18ad5d

Browse files
committed
HBASE-27752: Update the list of prefetched files upon region movement
1 parent 143e9b4 commit f18ad5d

3 files changed

Lines changed: 190 additions & 2 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static void request(Path path, Runnable runnable) {
123123
public static void complete(Path path) {
124124
prefetchFutures.remove(path);
125125
prefetchCompleted.put(path.getName(), true);
126-
LOG.debug("Prefetch completed for {}", path);
126+
LOG.debug("Prefetch completed for {}", path.getName());
127127
}
128128

129129
public static void cancel(Path path) {
@@ -134,7 +134,8 @@ public static void cancel(Path path) {
134134
prefetchFutures.remove(path);
135135
LOG.debug("Prefetch cancelled for {}", path);
136136
}
137-
prefetchCompleted.remove(path.getName());
137+
LOG.debug("Removing filename from the prefetched persistence list: " + path.getName());
138+
removePrefetchedFileWhileEvict(path.getName());
138139
}
139140

140141
public static boolean isCompleted(Path path) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/CloseRegionHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.apache.hadoop.hbase.client.RegionInfo;
2525
import org.apache.hadoop.hbase.executor.EventHandler;
2626
import org.apache.hadoop.hbase.executor.EventType;
27+
import org.apache.hadoop.hbase.io.hfile.BlockCache;
28+
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
29+
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
2730
import org.apache.hadoop.hbase.procedure2.Procedure;
2831
import org.apache.hadoop.hbase.regionserver.HRegion;
2932
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -33,6 +36,7 @@
3336
import org.slf4j.LoggerFactory;
3437

3538
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
39+
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
3640

3741
/**
3842
* Handles closing of a region on a region server.
@@ -101,6 +105,19 @@ public void process() throws IOException {
101105
return;
102106
}
103107

108+
rsServices.getBlockCache().ifPresent(blockCache -> {
109+
if (blockCache instanceof CombinedBlockCache) {
110+
BlockCache l2 = ((CombinedBlockCache)blockCache).getSecondLevelCache();
111+
if (l2 instanceof BucketCache) {
112+
if (region.getReadOnlyConfiguration().get(PREFETCH_PERSISTENCE_PATH_KEY) != null) {
113+
LOG.info("Closing region {} during a graceful stop, and prefetch persistence is on, "
114+
+ "so setting evict on close to false. ", region.getRegionInfo().getEncodedName());
115+
region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(false));
116+
}
117+
}
118+
}
119+
});
120+
104121
// Close the region
105122
if (region.close(abort) == null) {
106123
// This region has already been closed. Should not happen (A unit test makes this
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io.hfile;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.Path;
22+
import org.apache.hadoop.hbase.HBaseClassTestRule;
23+
import org.apache.hadoop.hbase.HBaseTestingUtil;
24+
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
25+
import org.apache.hadoop.hbase.StartTestingClusterOption;
26+
import org.apache.hadoop.hbase.TableName;
27+
import org.apache.hadoop.hbase.client.Admin;
28+
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
29+
import org.apache.hadoop.hbase.client.Put;
30+
import org.apache.hadoop.hbase.client.RegionInfo;
31+
import org.apache.hadoop.hbase.client.Table;
32+
import org.apache.hadoop.hbase.client.TableDescriptor;
33+
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
34+
import org.apache.hadoop.hbase.regionserver.HRegionServer;
35+
import org.apache.hadoop.hbase.testclassification.IOTests;
36+
import org.apache.hadoop.hbase.testclassification.MediumTests;
37+
import org.apache.hadoop.hbase.util.Bytes;
38+
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
39+
import org.junit.After;
40+
import org.junit.Before;
41+
import org.junit.ClassRule;
42+
import org.junit.Test;
43+
import org.junit.experimental.categories.Category;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
import java.io.IOException;
47+
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
48+
import static org.junit.Assert.assertEquals;
49+
import static org.junit.Assert.assertNotEquals;
50+
import static org.junit.Assert.assertTrue;
51+
52+
@Category({ IOTests.class, MediumTests.class })
53+
public class TestBlockEvictionOnRegionMovement {
54+
55+
@ClassRule
56+
public static final HBaseClassTestRule CLASS_RULE =
57+
HBaseClassTestRule.forClass(TestBlockEvictionOnRegionMovement.class);
58+
59+
private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionOnRegionMovement.class);
60+
61+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
62+
63+
private Configuration conf;
64+
Path testDir;
65+
MiniZooKeeperCluster zkCluster;
66+
SingleProcessHBaseCluster cluster;
67+
StartTestingClusterOption option =
68+
StartTestingClusterOption.builder().numRegionServers(2).build();
69+
70+
@Before
71+
public void setup() throws Exception {
72+
conf = TEST_UTIL.getConfiguration();
73+
testDir = TEST_UTIL.getDataTestDir();
74+
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
75+
76+
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
77+
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
78+
conf.setInt("hbase.bucketcache.size", 400);
79+
conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence");
80+
conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir + "/prefetch.persistence");
81+
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
82+
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
83+
zkCluster = TEST_UTIL.startMiniZKCluster();
84+
cluster = TEST_UTIL.startMiniHBaseCluster(option);
85+
cluster.setConf(conf);
86+
}
87+
88+
@Test
89+
public void testBlockEvictionOnRegionMove() throws Exception {
90+
// Write to table and flush
91+
TableName tableRegionMove = writeDataToTable();
92+
93+
HRegionServer regionServingRS =
94+
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1 ?
95+
cluster.getRegionServer(1) : cluster.getRegionServer(0);
96+
assertTrue(regionServingRS.getBlockCache().isPresent());
97+
long oldUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
98+
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
99+
100+
Admin admin = TEST_UTIL.getAdmin();
101+
RegionInfo regionToMove = regionServingRS.getRegions(tableRegionMove).get(0).getRegionInfo();
102+
admin.move(regionToMove.getEncodedNameAsBytes(), TEST_UTIL.getOtherRegionServer(regionServingRS).getServerName());
103+
assertEquals(0, regionServingRS.getRegions(tableRegionMove).size());
104+
105+
long newUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
106+
assertTrue(oldUsedCacheSize > newUsedCacheSize);
107+
assertEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
108+
}
109+
110+
@Test
111+
public void testBlockEvictionOnGracefulStop() throws Exception {
112+
// Write to table and flush
113+
TableName tableRegionClose = writeDataToTable();
114+
115+
HRegionServer regionServingRS =
116+
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1 ?
117+
cluster.getRegionServer(1) : cluster.getRegionServer(0);
118+
119+
assertTrue(regionServingRS.getBlockCache().isPresent());
120+
long oldUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
121+
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
122+
123+
cluster.stopRegionServer(regionServingRS.getServerName());
124+
Thread.sleep(500);
125+
cluster.startRegionServer();
126+
Thread.sleep(500);
127+
128+
long newUsedCacheSize = regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
129+
assertEquals(oldUsedCacheSize, newUsedCacheSize);
130+
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
131+
}
132+
133+
public TableName writeDataToTable() throws IOException, InterruptedException {
134+
TableName tableName = TableName.valueOf("table1");
135+
byte[] row0 = Bytes.toBytes("row1");
136+
byte[] row1 = Bytes.toBytes("row2");
137+
byte[] family = Bytes.toBytes("family");
138+
byte[] qf1 = Bytes.toBytes("qf1");
139+
byte[] qf2 = Bytes.toBytes("qf2");
140+
byte[] value1 = Bytes.toBytes("value1");
141+
byte[] value2 = Bytes.toBytes("value2");
142+
143+
TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
144+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
145+
Table table = TEST_UTIL.createTable(td, null);
146+
try {
147+
// put data
148+
Put put0 = new Put(row0);
149+
put0.addColumn(family, qf1, 1, value1);
150+
table.put(put0);
151+
Put put1 = new Put(row1);
152+
put1.addColumn(family, qf2, 1, value2);
153+
table.put(put1);
154+
TEST_UTIL.flush(tableName);
155+
} finally {
156+
Thread.sleep(1000);
157+
}
158+
assertEquals(1, cluster.getRegions(tableName).size());
159+
return tableName;
160+
}
161+
162+
@After
163+
public void tearDown() throws Exception {
164+
TEST_UTIL.shutdownMiniCluster();
165+
TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
166+
if (zkCluster != null) {
167+
zkCluster.shutdown();
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)