Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,15 @@ Future<Void> modifyColumnFamilyAsync(TableName tableName, ColumnFamilyDescriptor
*/
void flushRegion(byte[] regionName) throws IOException;

/**
* Flush a column family within a region. Synchronous operation.
*
* @param regionName region to flush
* @param columnFamily column family within a region
* @throws IOException if a remote or network exception occurs
*/
void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException;

/**
* Flush all regions on the region server. Synchronous operation.
* @param serverName the region server name to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public void flushRegion(byte[] regionName) throws IOException {
get(admin.flushRegion(regionName));
}

@Override
public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException {
get(admin.flushRegion(regionName, columnFamily));
}

@Override
public void flushRegionServer(ServerName serverName) throws IOException {
get(admin.flushRegionServer(serverName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ CompletableFuture<Void> modifyColumnFamily(TableName tableName,
*/
CompletableFuture<Void> flushRegion(byte[] regionName);

/**
* Flush a column family within a region.
* @param regionName region to flush
* @param columnFamily column family within a region. If not present, flush the region's all
* column families.
*/
CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily);

/**
* Flush all region on the region server.
* @param serverName server to flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName));
}

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
return wrap(rawAdmin.flushRegion(regionName, columnFamily));
}

@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
return wrap(rawAdmin.flushRegionServer(sn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,18 +926,26 @@ public CompletableFuture<Void> flush(TableName tableName) {

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return flushRegionInternal(regionName, false).thenAccept(r -> {
return flushRegionInternal(regionName, null, false).thenAccept(r -> {
});
}

@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
Preconditions.checkNotNull(columnFamily, "columnFamily is null."
+ "If you don't specify a columnFamily, use flushRegion(regionName) instead");
return flushRegionInternal(regionName, columnFamily, false)
.thenAccept(r -> {});
}

/**
* This method is for internal use only, where we need the response of the flush.
* <p/>
* As it exposes the protobuf message, please do <strong>NOT</strong> try to expose it as a public
* API.
*/
CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
Expand All @@ -950,23 +958,25 @@ CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
addListener(
flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}});
});
return future;
}

private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
.action((controller, stub) -> this
.<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
columnFamily, writeFlushWALMarker),
(s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
.call();
}
Expand All @@ -981,8 +991,11 @@ public CompletableFuture<Void> flushRegionServer(ServerName sn) {
}
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
})));
hRegionInfos.forEach(
region -> compactFutures.add(
flush(sn, region, null, false).thenAccept(r -> {})
)
);
}
addListener(CompletableFuture.allOf(
compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,20 +779,24 @@ public static GetOnlineRegionRequest buildGetOnlineRegionRequest() {
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) {
return buildFlushRegionRequest(regionName, false);
return buildFlushRegionRequest(regionName, null, false);
}

/**
* Create a protocol buffer FlushRegionRequest for a given region name
* @param regionName the name of the region to get info
* @param columnFamily column family within a region
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setWriteFlushWalMarker(writeFlushWALMarker);
if (columnFamily != null) {
builder.setFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ message FlushRegionRequest {
required RegionSpecifier region = 1;
optional uint64 if_older_than_ts = 2;
optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
optional bytes family = 4;
}

message FlushRegionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, writeFlushWALMarker);
return admin.flushRegionInternal(regionName, null, writeFlushWALMarker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1788,8 +1788,15 @@ public FlushRegionResponse flushRegion(final RpcController controller,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult =
region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
HRegion.FlushResultImpl flushResult = null;
if (request.hasFamily()) {
List families = new ArrayList();
families.add(request.getFamily().toByteArray());
flushResult =
region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
} else {
flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public class TestFlushFromClient {
Bytes.toBytes("1"),
Bytes.toBytes("4"),
Bytes.toBytes("8"));
private static final byte[] FAMILY = Bytes.toBytes("f1");

private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2};
@Rule
public TestName name = new TestName();

Expand All @@ -85,11 +86,14 @@ public static void tearDownAfterClass() throws Exception {
@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) {
try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
for (int i = 0; i != 20; ++i) {
byte[] value = Bytes.toBytes(i);
puts.forEach(p -> p.addColumn(FAMILY, value, value));
puts.forEach(p -> {
p.addColumn(FAMILY_1, value, value);
p.addColumn(FAMILY_2, value, value);
});
}
t.put(puts);
}
Expand Down Expand Up @@ -131,6 +135,18 @@ public void testFlushRegion() throws Exception {
}
}

@Test
public void testFlushRegionFamily() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}
}

@Test
public void testAsyncFlushRegion() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
Expand All @@ -141,6 +157,17 @@ public void testAsyncFlushRegion() throws Exception {
}
}

@Test
public void testAsyncFlushRegionFamily() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}

@Test
public void testFlushRegionServer() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ public void flushRegion(byte[] regionName) throws IOException {
admin.flushRegion(regionName);
}

public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException {
admin.flushRegion(regionName, columnFamily);
}

public void flushRegionServer(ServerName serverName) throws IOException {
admin.flushRegionServer(serverName);
}
Expand Down
10 changes: 8 additions & 2 deletions hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,14 @@ def list(regex = '.*')

#----------------------------------------------------------------------------------------------
# Requests a table or region or region server flush
def flush(name)
@admin.flushRegion(name.to_java_bytes)
def flush(name, family = nil)
family_bytes = nil
family_bytes = family.to_java_bytes unless family.nil?
if family_bytes.nil?
@admin.flushRegion(name.to_java_bytes)
else
@admin.flushRegion(name.to_java_bytes, family_bytes)
end
rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
# Unknown region. Try table.
begin
Expand Down
7 changes: 5 additions & 2 deletions hbase-shell/src/main/ruby/shell/commands/flush.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ def help
Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions.
You can also flush a single column family within a region.
For example:

hbase> flush 'TABLENAME'
hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME'
hbase> flush 'ENCODED_REGIONNAME','FAMILYNAME'
hbase> flush 'REGION_SERVER_NAME'
EOF
end

def command(table_or_region_name)
admin.flush(table_or_region_name)
def command(table_or_region_name, family = nil)
admin.flush(table_or_region_name, family)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ public void flushRegion(byte[] regionName) {

}

@Override
public void flushRegion(byte[] regionName, byte[] columnFamily) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");
}

@Override
public void flushRegionServer(ServerName serverName) {
throw new NotImplementedException("flushRegionServer not supported in ThriftAdmin");
Expand Down