Otherwise, R1 is evaluated
+ * Simple implementation of region normalizer. Logic in use:
+ *
+ * - Get all regions of a given table
+ *
- Get avg size S of each region (by total size of store files reported in RegionMetrics)
+ *
- Seek every single region one by one. If a region R0 is bigger than S * 2, it is kindly
+ * requested to split. Thereon evaluate the next region R1
+ *
- Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge. Thereon
+ * evaluate the next region R2
+ *
- Otherwise, R1 is evaluated
*
*
- * Region sizes are coarse and approximate on the order of megabytes. Additionally,
- * "empty" regions (less than 1MB, with the previous note) are not merged away. This
- * is by design to prevent normalization from undoing the pre-splitting of a table.
+ * Region sizes are coarse and approximate on the order of megabytes. Additionally, "empty" regions
+ * (less than 1MB, with the previous note) are not merged away. This is by design to prevent
+ * normalization from undoing the pre-splitting of a table.
*/
@InterfaceAudience.Private
-public class SimpleRegionNormalizer implements RegionNormalizer {
+public class SimpleRegionNormalizer extends AbstractRegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
private int minRegionCount;
- private MasterServices masterServices;
- private MasterRpcServices masterRpcServices;
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
public SimpleRegionNormalizer() {
minRegionCount = HBaseConfiguration.create().getInt("hbase.normalizer.min.region.count", 3);
}
- /**
- * Set the master service.
- * @param masterServices inject instance of MasterServices
- */
- @Override
- public void setMasterServices(MasterServices masterServices) {
- this.masterServices = masterServices;
- }
-
- @Override
- public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
- this.masterRpcServices = masterRpcServices;
- }
@Override
public void planSkipped(RegionInfo hri, PlanType type) {
@@ -119,138 +91,56 @@ public int compare(NormalizationPlan plan1, NormalizationPlan plan2) {
private Comparator planComparator = new PlanComparator();
/**
- * Computes next most "urgent" normalization action on the table.
- * Action may be either a split, or a merge, or no action.
- *
+ * Computes next most "urgent" normalization action on the table. Action may be either a split, or
+ * a merge, or no action.
* @param table table to normalize
* @return normalization plan to execute
*/
@Override
public List computePlanForTable(TableName table) throws HBaseIOException {
if (table == null || table.isSystemTable()) {
- LOG.debug("Normalization of system table " + table + " isn't allowed");
+ LOG.debug("Normalization of system table {} isn't allowed", table);
return null;
}
- boolean splitEnabled = true, mergeEnabled = true;
- try {
- splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
- RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- LOG.debug("Unable to determine whether split is enabled", e);
- }
- try {
- mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
- RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
- } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
- LOG.debug("Unable to determine whether merge is enabled", e);
- }
+ boolean splitEnabled = isSplitEnabled();
+ boolean mergeEnabled = isMergeEnabled();
if (!mergeEnabled && !splitEnabled) {
- LOG.debug("Both split and merge are disabled for table: " + table);
+ LOG.debug("Both split and merge are disabled for table: {}", table);
return null;
}
List plans = new ArrayList<>();
- List tableRegions = masterServices.getAssignmentManager().getRegionStates().
- getRegionsOfTable(table);
+ List tableRegions =
+ masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
- //TODO: should we make min number of regions a config param?
if (tableRegions == null || tableRegions.size() < minRegionCount) {
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
- LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
- + " of regions for normalizer to run is " + minRegionCount + ", not running normalizer");
+ LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run is "
+ + "{}, not running normalizer",
+ table, nrRegions, minRegionCount);
return null;
}
- LOG.debug("Computing normalization plan for table: " + table +
- ", number of regions: " + tableRegions.size());
-
- long totalSizeMb = 0;
- int acutalRegionCnt = 0;
+ LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table,
+ tableRegions.size());
- for (int i = 0; i < tableRegions.size(); i++) {
- RegionInfo hri = tableRegions.get(i);
- long regionSize = getRegionSize(hri);
- if (regionSize > 0) {
- acutalRegionCnt++;
- totalSizeMb += regionSize;
- }
- }
- int targetRegionCount = -1;
- long targetRegionSize = -1;
- try {
- TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
- if(tableDescriptor != null) {
- targetRegionCount =
- tableDescriptor.getNormalizerTargetRegionCount();
- targetRegionSize =
- tableDescriptor.getNormalizerTargetRegionSize();
- LOG.debug("Table {}: target region count is {}, target region size is {}", table,
- targetRegionCount, targetRegionSize);
+ if (splitEnabled) {
+ List splitPlans = getSplitNormalizationPlan(table);
+ if (splitPlans != null) {
+ plans.addAll(splitPlans);
}
- } catch (IOException e) {
- LOG.warn(
- "cannot get the target number and target size of table {}, they will be default value -1.",
- table);
- }
-
- double avgRegionSize;
- if (targetRegionSize > 0) {
- avgRegionSize = targetRegionSize;
- } else if (targetRegionCount > 0) {
- avgRegionSize = totalSizeMb / (double) targetRegionCount;
- } else {
- avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
}
- LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
- LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
-
- int candidateIdx = 0;
- while (candidateIdx < tableRegions.size()) {
- RegionInfo hri = tableRegions.get(candidateIdx);
- long regionSize = getRegionSize(hri);
- // if the region is > 2 times larger than average, we split it, split
- // is more high priority normalization action than merge.
- if (regionSize > 2 * avgRegionSize) {
- if (splitEnabled) {
- LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
- + regionSize + ", more than twice avg size, splitting");
- plans.add(new SplitNormalizationPlan(hri, null));
- }
- } else {
- if (candidateIdx == tableRegions.size()-1) {
- break;
- }
- if (mergeEnabled) {
- RegionInfo hri2 = tableRegions.get(candidateIdx+1);
- long regionSize2 = getRegionSize(hri2);
- if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
- LOG.info("Table " + table + ", small region size: " + regionSize
- + " plus its neighbor size: " + regionSize2
- + ", less than the avg size " + avgRegionSize + ", merging them");
- plans.add(new MergeNormalizationPlan(hri, hri2));
- candidateIdx++;
- }
- }
+ if (mergeEnabled) {
+ List mergePlans = getMergeNormalizationPlan(table);
+ if (mergePlans != null) {
+ plans.addAll(mergePlans);
}
- candidateIdx++;
}
if (plans.isEmpty()) {
- LOG.debug("No normalization needed, regions look good for table: " + table);
+ LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
Collections.sort(plans, planComparator);
return plans;
}
-
- private long getRegionSize(RegionInfo hri) {
- ServerName sn = masterServices.getAssignmentManager().getRegionStates().
- getRegionServerOfRegion(hri);
- RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
- getRegionMetrics().get(hri.getRegionName());
- if (regionLoad == null) {
- LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
- return -1;
- }
- return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
new file mode 100644
index 000000000000..0d74255d7fdd
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestMergeNormalizer.java
@@ -0,0 +1,270 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.when;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestMergeNormalizer {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMergeNormalizer.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
+
+ private static RegionNormalizer normalizer;
+
+ // mocks
+ private static MasterServices masterServices;
+ private static MasterRpcServices masterRpcServices;
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ normalizer = new MergeNormalizer();
+ }
+
+ @Test
+ public void testNoNormalizationForMetaTable() throws HBaseIOException {
+ TableName testTable = TableName.META_TABLE_NAME;
+ List hris = new ArrayList<>();
+ Map regionSizes = new HashMap<>();
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+ List hris = new ArrayList<>();
+ Map regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ regionSizes.put(hri1.getRegionName(), 10);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 15);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testSplitOfSmallRegion");
+ List hris = new ArrayList<>();
+ Map regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 10);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 15);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 8);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 10);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List plans = normalizer.computePlanForTable(testTable);
+ assertNull(plans);
+ }
+
+ @Test
+ public void testMergeOfSmallRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
+ List hris = new ArrayList<>();
+ Map regionSizes = new HashMap<>();
+
+ Timestamp currentTime = new Timestamp(System.currentTimeMillis());
+ Timestamp threedaysBefore = new Timestamp(currentTime.getTime() - TimeUnit.DAYS.toMillis(3));
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 15);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 5);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 5);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 15);
+
+ RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+ .setEndKey(Bytes.toBytes("fff")).build();
+ hris.add(hri5);
+ regionSizes.put(hri5.getRegionName(), 16);
+
+ RegionInfo hri6 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("fff"))
+ .setEndKey(Bytes.toBytes("ggg")).setRegionId(threedaysBefore.getTime()).build();
+ hris.add(hri6);
+ regionSizes.put(hri6.getRegionName(), 0);
+
+ RegionInfo hri7 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ggg"))
+ .setEndKey(Bytes.toBytes("hhh")).build();
+ hris.add(hri7);
+ regionSizes.put(hri7.getRegionName(), 0);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List plans = normalizer.computePlanForTable(testTable);
+
+ NormalizationPlan plan = plans.get(0);
+ assertTrue(plan instanceof MergeNormalizationPlan);
+ assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
+ assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
+
+ // to check last 0 sized regions are merged
+ plan = plans.get(1);
+ assertEquals(hri6, ((MergeNormalizationPlan) plan).getFirstRegion());
+ assertEquals(hri7, ((MergeNormalizationPlan) plan).getSecondRegion());
+ }
+
+ @Test
+ public void testMergeOfNewSmallRegions() throws HBaseIOException {
+ TableName testTable = TableName.valueOf("testMergeOfNewSmallRegions");
+ List hris = new ArrayList<>();
+ Map regionSizes = new HashMap<>();
+
+ RegionInfo hri1 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("aaa"))
+ .setEndKey(Bytes.toBytes("bbb")).build();
+ hris.add(hri1);
+ regionSizes.put(hri1.getRegionName(), 15);
+
+ RegionInfo hri2 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("bbb"))
+ .setEndKey(Bytes.toBytes("ccc")).build();
+ hris.add(hri2);
+ regionSizes.put(hri2.getRegionName(), 5);
+
+ RegionInfo hri3 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ccc"))
+ .setEndKey(Bytes.toBytes("ddd")).build();
+ hris.add(hri3);
+ regionSizes.put(hri3.getRegionName(), 16);
+
+ RegionInfo hri4 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("ddd"))
+ .setEndKey(Bytes.toBytes("eee")).build();
+ hris.add(hri4);
+ regionSizes.put(hri4.getRegionName(), 15);
+
+ RegionInfo hri5 = RegionInfoBuilder.newBuilder(testTable).setStartKey(Bytes.toBytes("eee"))
+ .setEndKey(Bytes.toBytes("fff")).build();
+ hris.add(hri4);
+ regionSizes.put(hri5.getRegionName(), 5);
+
+ setupMocksForNormalizer(regionSizes, hris);
+ List plans = normalizer.computePlanForTable(testTable);
+
+ assertNull(plans);
+ }
+
+ @SuppressWarnings("MockitoCast")
+ protected void setupMocksForNormalizer(Map regionSizes,
+ List RegionInfo) {
+ masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
+ masterRpcServices = Mockito.mock(MasterRpcServices.class, RETURNS_DEEP_STUBS);
+
+ // for simplicity all regions are assumed to be on one server; doesn't matter to us
+ ServerName sn = ServerName.valueOf("localhost", 0, 1L);
+ when(masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(any()))
+ .thenReturn(RegionInfo);
+ when(masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(any()))
+ .thenReturn(sn);
+
+ for (Map.Entry region : regionSizes.entrySet()) {
+ RegionMetrics regionLoad = Mockito.mock(RegionMetrics.class);
+ when(regionLoad.getRegionName()).thenReturn(region.getKey());
+ when(regionLoad.getStoreFileSize())
+ .thenReturn(new Size(region.getValue(), Size.Unit.MEGABYTE));
+
+ // this is possibly broken with jdk9, unclear if false positive or not
+ // suppress it for now, fix it when we get to running tests on 9
+ // see: http://errorprone.info/bugpattern/MockitoCast
+ when((Object) masterServices.getServerManager().getLoad(sn).getRegionMetrics()
+ .get(region.getKey())).thenReturn(regionLoad);
+ }
+ try {
+ when(masterRpcServices.isSplitOrMergeEnabled(any(), any())).thenReturn(
+ MasterProtos.IsSplitOrMergeEnabledResponse.newBuilder().setEnabled(true).build());
+ } catch (ServiceException se) {
+ LOG.debug("error setting isSplitOrMergeEnabled switch", se);
+ }
+
+ normalizer.setMasterServices(masterServices);
+ normalizer.setMasterRpcServices(masterRpcServices);
+ }
+}