Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public List<NormalizationTarget> getNormalizationTargets() {
return normalizationTargets;
}

@Override
public long getPlanSizeMb() {
long total = 0;
for (NormalizationTarget target : normalizationTargets) {
total += target.getRegionSizeMb();
}
return total;
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ enum PlanType {

/** Returns the type of this plan */
PlanType getType();

long getPlanSizeMb();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -53,6 +54,9 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
"hbase.normalizer.throughput.max_bytes_per_sec";
private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec

static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;

private final MasterServices masterServices;
private final RegionNormalizer regionNormalizer;
private final RegionNormalizerWorkQueue<TableName> workQueue;
Expand All @@ -62,6 +66,7 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
private final boolean defaultNormalizerTableLevel;
private long splitPlanCount;
private long mergePlanCount;
private long cumulativePlansSizeLimitMb;

RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
Expand All @@ -73,6 +78,8 @@ class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnab
this.mergePlanCount = 0;
this.rateLimiter = loadRateLimiter(configuration);
this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
this.cumulativePlansSizeLimitMb =
configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
}

private boolean extractDefaultNormalizerValue(final Configuration configuration) {
Expand Down Expand Up @@ -207,14 +214,34 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);

plans = truncateForSize(plans);

if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
}
return plans;
}

private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
if (cumulativePlansSizeLimitMb != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
long cumulativeSizeMb = 0;
for (NormalizationPlan plan : plans) {
cumulativeSizeMb += plan.getPlanSizeMb();
if (cumulativeSizeMb > cumulativePlansSizeLimitMb) {
break;
}
maybeTruncatedPlans.add(plan);
}
return maybeTruncatedPlans;
} else {
return plans;
}
}

private void submitPlans(final List<NormalizationPlan> plans) {
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
// task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver
static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 0;
static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;

private MasterServices masterServices;
private NormalizerConfiguration normalizerConfiguration;
Expand Down Expand Up @@ -229,6 +231,14 @@ public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableD
plans.addAll(mergePlans);
}

if (
normalizerConfiguration.getCumulativePlansSizeLimitMb() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB
) {
// If we are going to truncate our list of plans, shuffle the split and merge plans together
// so that the merge plans, which are listed last, are not starved out.
shuffleNormalizationPlans(plans);
}

LOG.debug("Computed normalization plans for table {}. Total plans: {}, split plans: {}, "
+ "merge plans: {}", table, plans.size(), splitPlansCount, mergePlansCount);
return plans;
Expand Down Expand Up @@ -464,6 +474,14 @@ private boolean isLargeEnoughForMerge(final NormalizerConfiguration normalizerCo
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

/**
* This very simple method exists so we can verify it was called in a unit test. Visible for
* testing.
*/
void shuffleNormalizationPlans(List<NormalizationPlan> plans) {
Collections.shuffle(plans);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
final Object... args) {
final boolean value = predicate.getAsBoolean();
Expand All @@ -484,6 +502,7 @@ private static final class NormalizerConfiguration {
private final int mergeMinRegionCount;
private final Period mergeMinRegionAge;
private final long mergeMinRegionSizeMb;
private final long cumulativePlansSizeLimitMb;

private NormalizerConfiguration() {
conf = null;
Expand All @@ -492,6 +511,7 @@ private NormalizerConfiguration() {
mergeMinRegionCount = DEFAULT_MERGE_MIN_REGION_COUNT;
mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
cumulativePlansSizeLimitMb = DEFAULT_CUMULATIVE_SIZE_LIMIT_MB;
}

private NormalizerConfiguration(final Configuration conf,
Expand All @@ -502,6 +522,8 @@ private NormalizerConfiguration(final Configuration conf,
mergeMinRegionCount = parseMergeMinRegionCount(conf);
mergeMinRegionAge = parseMergeMinRegionAge(conf);
mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
cumulativePlansSizeLimitMb =
conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB);
logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
splitEnabled);
logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
Expand All @@ -512,6 +534,8 @@ private NormalizerConfiguration(final Configuration conf,
currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge);
logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY,
currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb);
logConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY,
currentConfiguration.getCumulativePlansSizeLimitMb(), cumulativePlansSizeLimitMb);
}

public Configuration getConf() {
Expand Down Expand Up @@ -574,6 +598,10 @@ public long getMergeMinRegionSizeMb(NormalizeContext context) {
}
return mergeMinRegionSizeMb;
}

public long getCumulativePlansSizeLimitMb() {
return cumulativePlansSizeLimitMb;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public NormalizationTarget getSplitTarget() {
return splitTarget;
}

@Override
public long getPlanSizeMb() {
return splitTarget.getRegionSizeMb();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,36 @@ public void testRateLimit() throws Exception {
Duration.ofNanos(endTime - startTime), greaterThanOrEqualTo(Duration.ofSeconds(5)));
}

@Test
public void testPlansSizeLimit() throws Exception {
final TableName tn = tableName.getTableName();
final TableDescriptor tnDescriptor =
TableDescriptorBuilder.newBuilder(tn).setNormalizationEnabled(true).build();
final RegionInfo splitRegionInfo = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo1 = RegionInfoBuilder.newBuilder(tn).build();
final RegionInfo mergeRegionInfo2 = RegionInfoBuilder.newBuilder(tn).build();
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong())).thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong())).thenReturn(1L);
when(regionNormalizer.computePlansForTable(tnDescriptor)).thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2), new MergeNormalizationPlan.Builder()
.addTarget(mergeRegionInfo1, 1).addTarget(mergeRegionInfo2, 2).build(),
new SplitNormalizationPlan(splitRegionInfo, 1)));

final Configuration conf = testingUtility.getConfiguration();
conf.setLong(RegionNormalizerWorker.CUMULATIVE_SIZE_LIMIT_MB_KEY, 5);

final RegionNormalizerWorker worker = new RegionNormalizerWorker(
testingUtility.getConfiguration(), masterServices, regionNormalizer, queue);
workerPool.submit(worker);
queue.put(tn);

assertThatEventually("worker should process first split plan, but not second",
worker::getSplitPlanCount, comparesEqualTo(1L));
assertThatEventually("worker should process merge plan", worker::getMergePlanCount,
comparesEqualTo(1L));
}

/**
* Repeatedly evaluates {@code matcher} against the result of calling {@code actualSupplier} until
* the matcher succeeds or the timeout period of 30 seconds is exhausted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.normalizer;

import static java.lang.String.format;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.CUMULATIVE_SIZE_LIMIT_MB_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.DEFAULT_MERGE_MIN_REGION_AGE_DAYS;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_ENABLED_KEY;
import static org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer.MERGE_MIN_REGION_AGE_DAYS_KEY;
Expand All @@ -30,13 +31,18 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Instant;
Expand Down Expand Up @@ -607,6 +613,23 @@ public void testNormalizerCannotMergeNonAdjacentRegions() {
assertThat(plans, empty());
}

@Test
public void testSizeLimitShufflesPlans() {
conf.setLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, 10);
final TableName tableName = name.getTableName();
final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 3, 3, 3, 3);
setupMocksForNormalizer(regionSizes, regionInfos);
when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(1L);
normalizer = spy(normalizer);

assertTrue(normalizer.isSplitEnabled());
assertTrue(normalizer.isMergeEnabled());
List<NormalizationPlan> computedPlans = normalizer.computePlansForTable(tableDescriptor);
assertThat(computedPlans, hasSize(4));
verify(normalizer, times(1)).shuffleNormalizationPlans(anyList());
}

@SuppressWarnings("MockitoCast")
private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
List<RegionInfo> regionInfoList) {
Expand Down