Skip to content

Commit 049f5e7

Browse files
committed
HBASE-26249 Request split and compact after bulk load files
1 parent f022692 commit 049f5e7

8 files changed

Lines changed: 62 additions & 51 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
4343
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
4444
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
45-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
45+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
4646
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
4747
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
4848
import org.apache.hadoop.hbase.security.Superusers;
@@ -62,7 +62,7 @@
6262
* Compact region on request and then run split if appropriate
6363
*/
6464
@InterfaceAudience.Private
65-
public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
65+
public class CompactSplit implements CompactionSplitRequester, PropagatingConfigurationObserver {
6666
private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
6767

6868
// Configuration key for the large compaction threads.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6007,7 +6007,7 @@ private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOExcept
60076007
}
60086008
}
60096009
if (this.rsServices != null && store.needsCompaction()) {
6010-
this.rsServices.getCompactionRequestor()
6010+
this.rsServices.getCompactionSplitRequester()
60116011
.requestCompaction(this, store, "load recovered hfiles request compaction",
60126012
Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null);
60136013
}
@@ -6934,18 +6934,19 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
69346934
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
69356935
// a sequence id that we can be sure is beyond the last hfile written).
69366936
if (assignSeqId) {
6937-
FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY);
6938-
if (fs.isFlushSucceeded()) {
6939-
seqId = ((FlushResultImpl)fs).flushSequenceId;
6940-
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
6941-
seqId = ((FlushResultImpl)fs).flushSequenceId;
6942-
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH) {
6937+
FlushResult flushResult = flushcache(true, false,
6938+
FlushLifeCycleTracker.DUMMY);
6939+
if (flushResult.isFlushSucceeded()) {
6940+
seqId = ((FlushResultImpl)flushResult).flushSequenceId;
6941+
} else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
6942+
seqId = ((FlushResultImpl)flushResult).flushSequenceId;
6943+
} else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH) {
69436944
// CANNOT_FLUSH may mean that a flush is already on-going
69446945
// we need to wait for that flush to complete
69456946
waitForFlushes();
69466947
} else {
69476948
throw new IOException("Could not bulk load with an assigned sequential ID because the "+
6948-
"flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
6949+
"flush didn't run. Reason for not flushing: " + ((FlushResultImpl)flushResult).failureReason);
69496950
}
69506951
}
69516952

@@ -7038,22 +7039,27 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
70387039
}
70397040

70407041
isSuccessful = true;
7041-
if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) {
7042-
// request compaction
7043-
familyWithFinalPath.keySet().forEach(family -> {
7044-
HStore store = getStore(family);
7045-
try {
7046-
if (this.rsServices != null && store.needsCompaction()) {
7047-
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
7048-
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
7049-
CompactionLifeCycleTracker.DUMMY, null);
7050-
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
7051-
this.getRegionInfo(), family);
7042+
// check split
7043+
boolean shouldSplit = checkSplit().isPresent();
7044+
if (shouldSplit) {
7045+
this.rsServices.getCompactionSplitRequester().requestSplit(this);
7046+
} else {
7047+
// request compact
7048+
try {
7049+
if (this.rsServices != null && conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) {
7050+
for (byte[] familyKey : familyWithFinalPath.keySet()) {
7051+
HStore store = getStore(familyKey);
7052+
if (store != null && store.needsCompaction()) {
7053+
this.rsServices.getCompactionSplitRequester()
7054+
.requestSystemCompaction(this, "bulk load hfiles");
7055+
LOG.debug("Request compact for region {} after bulk load", getRegionInfo());
7056+
break;
7057+
}
70527058
}
7053-
} catch (IOException e) {
7054-
LOG.error("bulkload hfiles request compaction error ", e);
70557059
}
7056-
});
7060+
} catch (IOException e) {
7061+
LOG.error("bulk load hfiles request compaction error ", e);
7062+
}
70577063
}
70587064
} finally {
70597065
if (wal != null && !storeFiles.isEmpty()) {
@@ -8348,7 +8354,7 @@ public void requestCompaction(String why, int priority, boolean major,
83488354
if (major) {
83498355
stores.values().forEach(HStore::triggerMajorCompaction);
83508356
}
8351-
rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker,
8357+
rsServices.getCompactionSplitRequester().requestCompaction(this, why, priority, tracker,
83528358
RpcServer.getRequestUser().orElse(null));
83538359
}
83548360

@@ -8363,7 +8369,7 @@ public void requestCompaction(byte[] family, String why, int priority, boolean m
83638369
if (major) {
83648370
store.triggerMajorCompaction();
83658371
}
8366-
rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker,
8372+
rsServices.getCompactionSplitRequester().requestCompaction(this, store, why, priority, tracker,
83678373
RpcServer.getRequestUser().orElse(null));
83688374
}
83698375

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@
141141
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
142142
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
143143
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
144-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
144+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
145145
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
146146
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
147147
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
@@ -3054,7 +3054,7 @@ public FlushRequester getFlushRequester() {
30543054
}
30553055

30563056
@Override
3057-
public CompactionRequester getCompactionRequestor() {
3057+
public CompactionSplitRequester getCompactionSplitRequester() {
30583058
return this.compactSplitThread;
30593059
}
30603060

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
3838
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
3939
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
40-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
40+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
4141
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
4242
import org.apache.hadoop.hbase.security.access.AccessChecker;
4343
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -73,10 +73,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
7373
FlushRequester getFlushRequester();
7474

7575
/**
76-
* @return Implementation of {@link CompactionRequester} or null. Usually it will not be null
76+
* @return Implementation of {@link CompactionSplitRequester} or null. Usually it will not be null
7777
* unless during intialization.
7878
*/
79-
CompactionRequester getCompactionRequestor();
79+
CompactionSplitRequester getCompactionSplitRequester();
8080

8181
/**
8282
* @return the RegionServerAccounting for this Region Server

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java renamed to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionSplitRequester.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,17 @@
2121

2222
import org.apache.hadoop.hbase.regionserver.HRegion;
2323
import org.apache.hadoop.hbase.regionserver.HStore;
24+
import org.apache.hadoop.hbase.regionserver.Region;
2425
import org.apache.hadoop.hbase.security.User;
2526
import org.apache.yetus.audience.InterfaceAudience;
2627

2728
import edu.umd.cs.findbugs.annotations.Nullable;
2829

2930
/**
30-
* Request a compaction.
31+
* Request compaction and split.
3132
*/
3233
@InterfaceAudience.Private
33-
public interface CompactionRequester {
34+
public interface CompactionSplitRequester {
3435

3536
/**
3637
* Request compaction on all the stores of the given region.
@@ -44,6 +45,16 @@ void requestCompaction(HRegion region, String why, int priority,
4445
void requestCompaction(HRegion region, HStore store, String why, int priority,
4546
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
4647

48+
/**
49+
* Request compaction on the given region.
50+
*/
51+
void requestSystemCompaction(HRegion region, String why) throws IOException;
52+
53+
/**
54+
* Request split on the given region.
55+
*/
56+
boolean requestSplit(final Region r) throws IOException;
57+
4758
/**
4859
* on/off compaction
4960
*/

hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
5454
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
5555
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
56-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
56+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
5757
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
5858
import org.apache.hadoop.hbase.security.access.AccessChecker;
5959
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -157,7 +157,7 @@ public FlushRequester getFlushRequester() {
157157
}
158158

159159
@Override
160-
public CompactionRequester getCompactionRequestor() {
160+
public CompactionSplitRequester getCompactionSplitRequester() {
161161
return null;
162162
}
163163

hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
6868
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
6969
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
70-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
70+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
7171
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
7272
import org.apache.hadoop.hbase.security.access.AccessChecker;
7373
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -328,7 +328,7 @@ public FlushRequester getFlushRequester() {
328328
return null;
329329
}
330330
@Override
331-
public CompactionRequester getCompactionRequestor() {
331+
public CompactionSplitRequester getCompactionSplitRequester() {
332332
return null;
333333
}
334334
@Override

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,24 @@
1919

2020
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
2121
import static org.mockito.ArgumentMatchers.any;
22-
import static org.mockito.ArgumentMatchers.anyInt;
23-
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.ArgumentMatchers.anyString;
2423
import static org.mockito.ArgumentMatchers.isA;
2524
import static org.mockito.Mockito.mock;
2625
import static org.mockito.Mockito.times;
2726
import static org.mockito.Mockito.verify;
2827
import static org.mockito.Mockito.when;
2928
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
30-
3129
import java.io.IOException;
3230
import java.util.ArrayList;
3331
import java.util.List;
34-
3532
import org.apache.hadoop.fs.Path;
3633
import org.apache.hadoop.hbase.HBaseClassTestRule;
3734
import org.apache.hadoop.hbase.TableName;
3835
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
3936
import org.apache.hadoop.hbase.client.RegionInfo;
4037
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
4138
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
42-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
43-
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
39+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
4440
import org.apache.hadoop.hbase.testclassification.SmallTests;
4541
import org.apache.hadoop.hbase.util.Pair;
4642
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -55,7 +51,7 @@
5551
@Category(SmallTests.class)
5652
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
5753
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
58-
private final CompactionRequester compactionRequester = mock(CompactSplit.class);
54+
private final CompactionSplitRequester compactionSplitRequester = mock(CompactSplit.class);
5955

6056
@ClassRule
6157
public static final HBaseClassTestRule CLASS_RULE =
@@ -88,7 +84,7 @@ public void shouldRequestCompactionAfterBulkLoad() throws IOException {
8884
try {
8985
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
9086
when(regionServerServices.getConfiguration()).thenReturn(conf);
91-
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
87+
when(regionServerServices.getCompactionSplitRequester()).thenReturn(compactionSplitRequester);
9288
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
9389
.thenAnswer(new Answer() {
9490
@Override
@@ -103,12 +99,10 @@ public Object answer(InvocationOnMock invocation) {
10399
}
104100
});
105101

106-
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
107-
any(), any());
108-
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
109-
// invoke three times for 3 families
110-
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
111-
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
102+
Mockito.doNothing().when(compactionSplitRequester).requestSystemCompaction(any(), any());
103+
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, true, null);
104+
// invoke three times for 1 families
105+
verify(compactionSplitRequester, times(1)).requestSystemCompaction(isA(HRegion.class), anyString());
112106
} finally {
113107
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
114108
}

0 commit comments

Comments
 (0)