Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,25 +451,25 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : Seems like only format change. Can u avoid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not format change? Moved this code out of the for loop.

if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
// sequence id before ShipperListener.beforeShipped
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
kvs.shipped();
bytesWrittenProgressForShippedCall = 0;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this out of "for loop", because the "kvs.shipped()" will release the pre blocks in HFileReader, but some cells may have not been shipped by 'writer.append(c)' in line 441.

if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
// sequence id before ShipperListener.beforeShipped
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
kvs.shipped();
bytesWrittenProgressForShippedCall = 0;
}
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(LargeTests.class)
public class TestCompactionWithByteBuff {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test guarantee that early release of BB happens and so possibly hit that issue? I dont think so. May be am missing something!

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionWithByteBuff.class);
@Rule
public TestName name = new TestName();

private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf = TEST_UTIL.getConfiguration();
private static Admin admin = null;

private static final byte[] COLUMN = Bytes.toBytes("A");
private static final int REGION_COUNT = 5;
private static final long ROW_COUNT = 200;
private static final int ROW_LENGTH = 20;
private static final int VALUE_LENGTH = 5000;

@BeforeClass
public static void setupBeforeClass() throws Exception {
conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 1024 * 5);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, REGION_COUNT * 2);
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, REGION_COUNT * 2);
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually Bucket Cache is coming into pic here? I think No. We write some data and flush. 2 files are created. And then compact that. What we wanted is that the compaction is reading the file blocks from BC. But cache data on write is by default false. (hbase.rs.cacheblocksonwrite).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not related to BC.
Set to offheap to make sure the blocks are read to OffheapByteBuffer, see HFileReaderImpl#shouldUseHeap.

conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 512);
TEST_UTIL.startMiniCluster();
admin = TEST_UTIL.getAdmin();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testCompaction() throws Exception {
TableName table = TableName.valueOf("t1");
admin.compactionSwitch(false, new ArrayList<>(0));
try (Table t = createTable(TEST_UTIL, table)) {
for (int i = 0; i < 2; i++) {
put(t);
admin.flush(table);
}
admin.compactionSwitch(true, new ArrayList<>(0));
admin.majorCompact(table);
List<JVMClusterUtil.RegionServerThread> regionServerThreads =
TEST_UTIL.getHBaseCluster().getRegionServerThreads();
TEST_UTIL.waitFor(2 * 60 * 1000L, () -> {
boolean result = true;
for (JVMClusterUtil.RegionServerThread regionServerThread : regionServerThreads) {
HRegionServer regionServer = regionServerThread.getRegionServer();
List<HRegion> regions = regionServer.getRegions(table);
for (HRegion region : regions) {
List<String> storeFileList = region.getStoreFileList(new byte[][] { COLUMN });
if (storeFileList.size() > 1) {
result = false;
}
}
}
return result;
});
}
}

private Table createTable(HBaseTestingUtility util, TableName tableName)
throws IOException {
TableDescriptor td =
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN).setBlocksize(1024 * 4).build())
.build();
byte[][] splits = new byte[REGION_COUNT - 1][];
for (int i = 1; i < REGION_COUNT; i++) {
splits[i - 1] = Bytes.toBytes(buildRow((int) (ROW_COUNT / REGION_COUNT * i)));
}
return util.createTable(td, splits);
}

private void put(Table table) throws IOException {
for (int i = 0; i < ROW_COUNT; i++) {
Put put = new Put(Bytes.toBytes(buildRow(i)));
put.addColumn(COLUMN, Bytes.toBytes("filed01"), buildValue(i, 1));
put.addColumn(COLUMN, Bytes.toBytes("filed02"), buildValue(i, 2));
put.addColumn(COLUMN, Bytes.toBytes("filed03"), buildValue(i, 3));
put.addColumn(COLUMN, Bytes.toBytes("filed04"), buildValue(i, 4));
put.addColumn(COLUMN, Bytes.toBytes("filed05"), buildValue(i, 5));
table.put(put);
}
}

private String buildRow(int index) {
String value = Long.toString(index);
String prefix = "user";
for (int i = 0; i < ROW_LENGTH - value.length(); i++) {
prefix += '0';
}
return prefix + value;
}

private byte[] buildValue(int index, int qualifierId) {
String row = buildRow(index) + "/f" + qualifierId + "-";
StringBuffer result = new StringBuffer();
while (result.length() < VALUE_LENGTH) {
result.append(row);
}
return Bytes.toBytes(result.toString().substring(0, VALUE_LENGTH));
}
}