Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public enum OperationStatusCode {
public static final String HBASE_MASTER_NORMALIZER_CLASS =
"hbase.master.normalizer.class";

/** Config for min age of region before being considerded for merge in mormalizer */
public static final int DEFAULT_MIN_DAYS_BEFORE_MERGE = 3;

public static final String HBASE_MASTER_DAYS_BEFORE_MERGE =
"hbase.master.normalize.daysBeforeMerge";

/** Cluster is standalone or pseudo-distributed */
public static final boolean CLUSTER_IS_LOCAL = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;

import com.google.protobuf.ServiceException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseNormalizer implements RegionNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(BaseNormalizer.class);
protected MasterServices masterServices;
protected MasterRpcServices masterRpcServices;

/**
* Set the master service.
* @param masterServices inject instance of MasterServices
*/
@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
}

@Override
public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
this.masterRpcServices = masterRpcServices;
}

protected long getRegionSize(HRegionInfo hri) {
ServerName sn =
masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
RegionLoad regionLoad =
masterServices.getServerManager().getLoad(sn).getRegionsLoad().get(hri.getRegionName());
if (regionLoad == null) {
LOG.debug("Region {} was not found in RegionsLoad", hri.getRegionNameAsString());
return -1;
}
return regionLoad.getStorefileSizeMB();
}

protected boolean isMergeEnabled() {
boolean mergeEnabled = true;
try {
mergeEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(Admin.MasterSwitchType.MERGE))
.getEnabled();
} catch (ServiceException se) {
LOG.warn("Unable to determine whether merge is enabled", se);
}
return mergeEnabled;
}

protected boolean isSplitEnabled() {
boolean splitEnabled = true;
try {
splitEnabled = masterRpcServices
.isSplitOrMergeEnabled(null,
RequestConverter.buildIsSplitOrMergeEnabledRequest(Admin.MasterSwitchType.SPLIT))
.getEnabled();
} catch (ServiceException se) {
LOG.warn("Unable to determine whether merge is enabled", se);
}
return splitEnabled;
}

protected double getAvgRegionSize(List<HRegionInfo> tableRegions) {
long totalSizeMb = 0;
int acutalRegionCnt = 0;
for (HRegionInfo hri : tableRegions) {
long regionSize = getRegionSize(hri);
// don't consider regions that are in bytes for averaging the size.
if (regionSize > 0) {
acutalRegionCnt++;
totalSizeMb += regionSize;
}
}

double avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
return avgRegionSize;
}

protected List<NormalizationPlan> getMergeNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<HRegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAvgRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);
LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table,
tableRegions.size());

int candidateIdx = 0;
while (candidateIdx < tableRegions.size()) {
if (candidateIdx == tableRegions.size() - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get rid of this if condition with while( candidateIdx < tableRegions.size()-1 )

break;
}
HRegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
HRegionInfo hri2 = tableRegions.get(candidateIdx + 1);
long regionSize2 = getRegionSize(hri2);
if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
// atleast one of the two regions should be older than MIN_REGION_DURATION days
plans.add(new MergeNormalizationPlan(hri, hri2));
candidateIdx++;
} else {
LOG.debug("Skipping region {} of table {} with size {}", hri.getRegionId(), table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of regionId, may be good to print region name with hri.getRegionNameAsString()

regionSize);
}
candidateIdx++;
}
return plans;
}

protected List<NormalizationPlan> getSplitNormalizationPlan(TableName table) {
List<NormalizationPlan> plans = new ArrayList<>();
List<HRegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
double avgRegionSize = getAvgRegionSize(tableRegions);
LOG.debug("Table {}, average region size: {}", table, avgRegionSize);

int candidateIdx = 0;
while (candidateIdx < tableRegions.size()) {
HRegionInfo hri = tableRegions.get(candidateIdx);
long regionSize = getRegionSize(hri);
// if the region is > 2 times larger than average, we split it, split
// is more high priority normalization action than merge.
if (regionSize > 2 * avgRegionSize) {
LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
+ regionSize + ", more than twice avg size, splitting");
plans.add(new SplitNormalizationPlan(hri, null));
}
candidateIdx++;
}
return plans;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of MergeNormalizer Logic in use:
* <ol>
* <li>get all regions of a given table
* <li>get avg size S of each region (by total size of store files reported in RegionLoad)
* <li>two regions R1 and its neighbour R2 are merged, if R1 + R2 &lt; S, and all such regions are
* returned to be merged
* <li>Otherwise, no action is performed
* </ol>
* <p>
* Region sizes are coarse and approximate on the order of megabytes. Also, empty regions (less than
* 1MB) are also merged if the age of region is &gt MIN_DURATION_FOR_MERGE (default 2)
* </p>
*/

@InterfaceAudience.Private
public class MergeNormalizer extends BaseNormalizer {
private static final Logger LOG = LoggerFactory.getLogger(MergeNormalizer.class);
private static final int MIN_REGION_COUNT = 3;

@Override
public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we implement all common parts of computePlanForTable in the base parent class, then only merge or split normalizer specifc logic would need to go on the implementations?

List<NormalizationPlan> plans = new ArrayList<>();
if (!shouldNormalize(table)) {
return null;
}
// atleast one of the two regions should be older than MIN_REGION_DURATION days
List<NormalizationPlan> normalizationPlans = getMergeNormalizationPlan(table);
for (NormalizationPlan plan : normalizationPlans) {
if (plan instanceof MergeNormalizationPlan) {
HRegionInfo hri = ((MergeNormalizationPlan) plan).getFirstRegion();
HRegionInfo hri2 = ((MergeNormalizationPlan) plan).getSecondRegion();
if (isOldEnoughToMerge(hri) || isOldEnoughToMerge(hri2)) {
plans.add(plan);
}
}
}
if (plans.isEmpty()) {
LOG.debug("No normalization needed, regions look good for table: {}", table);
return null;
}
return plans;
}

private boolean isOldEnoughToMerge(HRegionInfo hri) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
Timestamp hriTime = new Timestamp(hri.getRegionId());
boolean isOld =
new Timestamp(hriTime.getTime() + TimeUnit.DAYS.toMillis(getMinimumDurationBeforeMerge()))
.before(currentTime);
return isOld;
}

private boolean shouldNormalize(TableName table) {
boolean normalize = false;
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think table could be null here, and we can remove null check from this if condition. Even if null was possible, log message would not be appropriate: Normalization of system table null isn't allowed

} else if (!isMergeEnabled()) {
LOG.debug("Merge disabled for table: {}", table);
} else {
List<HRegionInfo> tableRegions =
masterServices.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
int nrRegions = tableRegions == null ? 0 : tableRegions.size();
LOG.debug(
"Table {} has {} regions, required min number of regions for normalizer to run is {} , "
+ "not running normalizer",
table, nrRegions, MIN_REGION_COUNT);
} else {
normalize = true;
}
}
return normalize;
}

private int getMinimumDurationBeforeMerge() {
int minDuration = masterServices.getConfiguration().getInt(
HConstants.HBASE_MASTER_DAYS_BEFORE_MERGE, HConstants.DEFAULT_MIN_DAYS_BEFORE_MERGE);
return minDuration;
}
}
Loading