diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index eb9a7f3eccc9..b0df4920e4ed 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -36,4 +36,6 @@ public final class TagType { // String based tag type used in replication public static final byte STRING_VIS_TAG_TYPE = (byte) 7; public static final byte TTL_TAG_TYPE = (byte) 8; + // tag with the custom cell tiering value for the row + public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index cef5a6488fa6..9297e7074a97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -214,13 +213,13 @@ default Optional shouldCacheFile(HFileInfo hFileInfo, Configuration con * not be overridden by all implementing classes. In such cases, the returned Optional will be * empty. For subclasses implementing this logic, the returned Optional would contain the boolean * value reflecting if the passed block should indeed be cached. - * @param key The key representing the block to check if it should be cached. - * @param timeRangeTracker the time range tracker containing the timestamps - * @param conf The configuration object to use for determining caching behavior. + * @param key The key representing the block to check if it should be cached. + * @param maxTimeStamp The maximum timestamp for the block to check if it should be cached. + * @param conf The configuration object to use for determining caching behavior. * @return An empty Optional if this method is not supported; otherwise, the returned Optional * contains the boolean value indicating if the block should be cached. */ - default Optional shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + default Optional shouldCacheBlock(BlockCacheKey key, long maxTimeStamp, Configuration conf) { return Optional.empty(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 3fcf75b39709..72ca37c0557c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -282,7 +282,8 @@ public boolean shouldCacheBlockOnRead(BlockCategory category) { public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo, Configuration conf) { Optional cacheFileBlock = Optional.of(true); - if (getBlockCache().isPresent()) { + // For DATA blocks only, if BuckeCache is in use, we don't need to cache block again + if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) { Optional result = getBlockCache().get().shouldCacheFile(hFileInfo, conf); if (result.isPresent()) { cacheFileBlock = result; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 672a7bc1e72c..e5d52858ab65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -494,10 +493,10 @@ public Optional shouldCacheFile(HFileInfo hFileInfo, Configuration conf } @Override - public Optional shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + public Optional shouldCacheBlock(BlockCacheKey key, long maxTimeStamp, Configuration conf) { - return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf), - l2Cache.shouldCacheBlock(key, timeRangeTracker, conf)); + return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, conf), + l2Cache.shouldCacheBlock(key, maxTimeStamp, conf)); } private Optional combineCacheResults(Optional result1, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index d8dffce59e85..a99eac4085e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.ShipperListener; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.BloomFilterWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -217,6 +218,12 @@ public interface Writer extends Closeable, CellSink, ShipperListener { */ void appendTrackedTimestampsToMetadata() throws IOException; + /** + * Add Custom cell timestamp to Metadata + */ + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException; + /** Returns the path to this {@link HFile} */ Path getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 4b60ef662c25..972e8070e1cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1380,7 +1380,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked); // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + if (cacheBlock && cacheOnRead) { // Using the wait on cache during compaction and prefetching. cache.cacheBlock(cacheKey, cacheCompressed diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java index 96bfe42f1fda..684aee3beaca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; @@ -29,6 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer { /** Cache configuration for caching data on write. */ protected final CacheConfig cacheConf; + public void setTimeRangeTrackerForTiering(Supplier timeRangeTrackerForTiering) { + this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; + } + + private Supplier timeRangeTrackerForTiering; + /** * Name for this object used when logging or in toString. Is either the result of a toString on * stream or else name of passed file Path. @@ -186,7 +194,9 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat this.path = path; this.name = path != null ? path.getName() : outputStream.toString(); this.hFileContext = fileContext; + // TODO: Move this back to upper layer this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); if (encoding != DataBlockEncoding.NONE) { this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); @@ -588,7 +598,8 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { } private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { - Optional result = cache.shouldCacheBlock(key, timeRangeTracker, conf); + Optional result = + cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); return result.orElse(true); } @@ -899,12 +910,19 @@ public void appendTrackedTimestampsToMetadata() throws IOException { appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + // TODO: The StoreFileReader always converts the byte[] to TimeRange + // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. + appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); + } + /** * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker * to include the timestamp of this key */ private void trackTimestamps(final ExtendedCell cell) { - if (Cell.Type.Put == cell.getType()) { + if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) { earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); } timeRangeTracker.includeTimestamp(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 2af46a49d89a..5867fff0861d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.regionserver.DataTieringManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -1195,8 +1194,9 @@ void freeSpace(final String why) { } } - if (bytesFreed < bytesToFreeWithExtra && - coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) + if ( + bytesFreed < bytesToFreeWithExtra && coldFiles != null + && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName()) ) { int freedBlockSize = bucketEntryWithKey.getValue().getLength(); if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) { @@ -2458,10 +2458,10 @@ public Optional shouldCacheFile(HFileInfo hFileInfo, Configuration conf } @Override - public Optional shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker, + public Optional shouldCacheBlock(BlockCacheKey key, long maxTimestamp, Configuration conf) { DataTieringManager dataTieringManager = DataTieringManager.getInstance(); - if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) { + if (dataTieringManager != null && !dataTieringManager.isHotData(maxTimestamp, conf)) { LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", key.getHfileName()); return Optional.of(false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 23ad3b42aef0..423297f667d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; @@ -314,6 +315,8 @@ private boolean prepareCreate(final MasterProcedureEnv env) throws IOException { StoreFileTrackerValidationUtils.checkForCreateTable(env.getMasterConfiguration(), tableDescriptor); + CustomCellTieredUtils.checkForModifyTable(tableDescriptor); + return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index 03ad19799cd3..95896838dc2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieredUtils; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; import org.apache.hadoop.hbase.rsgroup.RSGroupInfo; import org.apache.hadoop.hbase.util.Bytes; @@ -420,6 +421,7 @@ private void prepareModify(final MasterProcedureEnv env) throws IOException { // check for store file tracker configurations StoreFileTrackerValidationUtils.checkForModifyTable(env.getMasterConfiguration(), unmodifiedTableDescriptor, modifiedTableDescriptor, !isTableEnabled(env)); + CustomCellTieredUtils.checkForModifyTable(modifiedTableDescriptor); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java new file mode 100644 index 000000000000..ed7dc01ba8d2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellTSTiering.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; + +import java.io.IOException; +import java.util.OptionalLong; +import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CellTSTiering implements DataTiering { + private static final Logger LOG = LoggerFactory.getLogger(CellTSTiering.class); + + public long getTimestamp(HStoreFile hStoreFile) { + OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); + if (!maxTimestamp.isPresent()) { + LOG.debug("Maximum timestamp not present for {}", hStoreFile.getPath()); + return Long.MAX_VALUE; + } + return maxTimestamp.getAsLong(); + } + + public long getTimestamp(HFileInfo hFileInfo) { + try { + byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY); + if (hFileTimeRange == null) { + LOG.debug("Timestamp information not found for file: {}", + hFileInfo.getHFileContext().getHFileName()); + return Long.MAX_VALUE; + } + return TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); + } catch (IOException e) { + LOG.error("Error occurred while reading the timestamp metadata of file: {}", + hFileInfo.getHFileContext().getHFileName(), e); + return Long.MAX_VALUE; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java new file mode 100644 index 000000000000..518b31fb5be4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieredStoreEngine.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Extension of {@link DateTieredStoreEngine} that uses a pluggable value provider for extracting + * the value to be used for comparison in this tiered compaction. Differently from the existing Date + * Tiered Compaction, this doesn't yield multiple tiers or files, but rather provides two tiers + * based on a configurable “cut-off” age. All rows with the cell tiering value older than this + * “cut-off” age would be placed together in an “old” tier, whilst younger rows would go to a + * separate, “young” tier file. + */ +@InterfaceAudience.Private +public class CustomTieredStoreEngine extends DateTieredStoreEngine { + + @Override + protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) + throws IOException { + CompoundConfiguration config = new CompoundConfiguration(); + config.add(conf); + config.add(store.conf); + config.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, + CustomDateTieredCompactionPolicy.class.getName()); + createCompactionPolicy(config, store); + this.storeFileManager = new DefaultStoreFileManager(kvComparator, + StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, config, compactionPolicy.getConf()); + this.storeFlusher = new DefaultStoreFlusher(config, store); + this.compactor = new CustomTieredCompactor(config, store); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java new file mode 100644 index 000000000000..7a9914c87d34 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTiering.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; + +import java.io.IOException; +import java.util.Date; +import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class CustomTiering implements DataTiering { + private static final Logger LOG = LoggerFactory.getLogger(CustomTiering.class); + + private long getMaxTSFromTimeRange(byte[] hFileTimeRange, String hFileName) { + try { + if (hFileTimeRange == null) { + LOG.debug("Custom cell-based timestamp information not found for file: {}", hFileName); + return Long.MAX_VALUE; + } + long parsedValue = TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); + LOG.debug("Max TS for file {} is {}", hFileName, new Date(parsedValue)); + return parsedValue; + } catch (IOException e) { + LOG.error("Error occurred while reading the Custom cell-based timestamp metadata of file: {}", + hFileName, e); + return Long.MAX_VALUE; + } + } + + public long getTimestamp(HStoreFile hStoreFile) { + return getMaxTSFromTimeRange(hStoreFile.getMetadataValue(CUSTOM_TIERING_TIME_RANGE), + hStoreFile.getPath().getName()); + } + + public long getTimestamp(HFileInfo hFileInfo) { + return getMaxTSFromTimeRange(hFileInfo.get(CUSTOM_TIERING_TIME_RANGE), + hFileInfo.getHFileContext().getHFileName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java new file mode 100644 index 000000000000..d2b88a501ec5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomTieringMultiFileWriter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.function.Function; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter { + + public static final byte[] CUSTOM_TIERING_TIME_RANGE = Bytes.toBytes("CUSTOM_TIERING_TIME_RANGE"); + + private NavigableMap lowerBoundary2TimeRanger = new TreeMap<>(); + + public CustomTieringMultiFileWriter(List lowerBoundaries, + Map lowerBoundariesPolicies, boolean needEmptyFile, + Function tieringFunction) { + super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, tieringFunction); + for (Long lowerBoundary : lowerBoundaries) { + lowerBoundary2TimeRanger.put(lowerBoundary, null); + } + } + + @Override + public void append(ExtendedCell cell) throws IOException { + super.append(cell); + long tieringValue = tieringFunction.apply(cell); + Map.Entry entry = lowerBoundary2TimeRanger.floorEntry(tieringValue); + if (entry.getValue() == null) { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(tieringValue); + timeRangeTracker.setMax(tieringValue); + lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker); + ((HFileWriterImpl) lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter()) + .setTimeRangeTrackerForTiering(() -> timeRangeTracker); + } else { + TimeRangeTracker timeRangeTracker = entry.getValue(); + if (timeRangeTracker.getMin() > tieringValue) { + timeRangeTracker.setMin(tieringValue); + } + if (timeRangeTracker.getMax() < tieringValue) { + timeRangeTracker.setMax(tieringValue); + } + } + } + + @Override + public List commitWriters(long maxSeqId, boolean majorCompaction, + Collection storeFiles) throws IOException { + for (Map.Entry entry : this.lowerBoundary2Writer.entrySet()) { + StoreFileWriter writer = entry.getValue(); + if (writer != null) { + writer.appendFileInfo(CUSTOM_TIERING_TIME_RANGE, + TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey()))); + } + } + return super.commitWriters(maxSeqId, majorCompaction, storeFiles); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java new file mode 100644 index 000000000000..51e89b0b79d0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTiering.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface DataTiering { + long getTimestamp(HStoreFile hFile); + + long getTimestamp(HFileInfo hFileInfo); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java index aa56e3f64445..2a5e2a5aa39d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringManager.java @@ -17,13 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; - import java.io.IOException; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.OptionalLong; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -136,17 +134,18 @@ public boolean isHotData(BlockCacheKey key) throws DataTieringException { * the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it uses the maximum * timestamp from the time range tracker to determine if the data is hot. Otherwise, it considers * the data as hot by default. - * @param timeRangeTracker the time range tracker containing the timestamps - * @param conf The configuration object to use for determining hot data criteria. + * @param maxTimestamp the maximum timestamp associated with the data. + * @param conf The configuration object to use for determining hot data criteria. * @return {@code true} if the data is hot, {@code false} otherwise */ - public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration conf) { + public boolean isHotData(long maxTimestamp, Configuration conf) { DataTieringType dataTieringType = getDataTieringType(conf); + if ( - dataTieringType.equals(DataTieringType.TIME_RANGE) - && timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP + !dataTieringType.equals(DataTieringType.NONE) + && maxTimestamp != TimeRangeTracker.INITIAL_MAX_TIMESTAMP ) { - return hotDataValidator(timeRangeTracker.getMax(), getDataTieringHotDataAge(conf)); + return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(conf)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -165,29 +164,14 @@ public boolean isHotData(Path hFilePath) throws DataTieringException { Configuration configuration = getConfiguration(hFilePath); DataTieringType dataTieringType = getDataTieringType(configuration); - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(getMaxTimestamp(hFilePath), getDataTieringHotDataAge(configuration)); - } - // DataTieringType.NONE or other types are considered hot by default - return true; - } - - /** - * Determines whether the data in the HFile at the given path is considered hot based on the - * configured data tiering type and hot data age. If the data tiering type is set to - * {@link DataTieringType#TIME_RANGE}, it validates the data against the provided maximum - * timestamp. - * @param hFilePath the path to the HFile - * @param maxTimestamp the maximum timestamp to validate against - * @return {@code true} if the data is hot, {@code false} otherwise - * @throws DataTieringException if there is an error retrieving data tiering information - */ - public boolean isHotData(Path hFilePath, long maxTimestamp) throws DataTieringException { - Configuration configuration = getConfiguration(hFilePath); - DataTieringType dataTieringType = getDataTieringType(configuration); - - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(configuration)); + if (!dataTieringType.equals(DataTieringType.NONE)) { + HStoreFile hStoreFile = getHStoreFile(hFilePath); + if (hStoreFile == null) { + throw new DataTieringException( + "Store file corresponding to " + hFilePath + " doesn't exist"); + } + return hotDataValidator(dataTieringType.getInstance().getTimestamp(getHStoreFile(hFilePath)), + getDataTieringHotDataAge(configuration)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -204,8 +188,9 @@ public boolean isHotData(Path hFilePath, long maxTimestamp) throws DataTieringEx */ public boolean isHotData(HFileInfo hFileInfo, Configuration configuration) { DataTieringType dataTieringType = getDataTieringType(configuration); - if (dataTieringType.equals(DataTieringType.TIME_RANGE)) { - return hotDataValidator(getMaxTimestamp(hFileInfo), getDataTieringHotDataAge(configuration)); + if (hFileInfo != null && !dataTieringType.equals(DataTieringType.NONE)) { + return hotDataValidator(dataTieringType.getInstance().getTimestamp(hFileInfo), + getDataTieringHotDataAge(configuration)); } // DataTieringType.NONE or other types are considered hot by default return true; @@ -217,36 +202,6 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) { return diff <= hotDataAge; } - private long getMaxTimestamp(Path hFilePath) throws DataTieringException { - HStoreFile hStoreFile = getHStoreFile(hFilePath); - if (hStoreFile == null) { - LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath); - return Long.MAX_VALUE; - } - OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); - if (!maxTimestamp.isPresent()) { - LOG.error("Maximum timestamp not present for {}", hFilePath); - return Long.MAX_VALUE; - } - return maxTimestamp.getAsLong(); - } - - private long getMaxTimestamp(HFileInfo hFileInfo) { - try { - byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY); - if (hFileTimeRange == null) { - LOG.error("Timestamp information not found for file: {}", - hFileInfo.getHFileContext().getHFileName()); - return Long.MAX_VALUE; - } - return TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); - } catch (IOException e) { - LOG.error("Error occurred while reading the timestamp metadata of file: {}", - hFileInfo.getHFileContext().getHFileName(), e); - return Long.MAX_VALUE; - } - } - private long getCurrentTimestamp() { return EnvironmentEdgeManager.getDelegate().currentTime(); } @@ -299,7 +254,7 @@ private HStore getHStore(Path hFilePath) throws DataTieringException { private HStoreFile getHStoreFile(Path hFilePath) throws DataTieringException { HStore hStore = getHStore(hFilePath); for (HStoreFile file : hStore.getStorefiles()) { - if (file.getPath().equals(hFilePath)) { + if (file.getPath().toUri().getPath().toString().equals(hFilePath.toString())) { return file; } } @@ -330,7 +285,8 @@ public Map getColdFilesList() { for (HRegion r : this.onlineRegions.values()) { for (HStore hStore : r.getStores()) { Configuration conf = hStore.getReadOnlyConfiguration(); - if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) { + DataTieringType dataTieringType = getDataTieringType(conf); + if (dataTieringType == DataTieringType.NONE) { // Data-Tiering not enabled for the store. Just skip it. continue; } @@ -339,14 +295,10 @@ public Map getColdFilesList() { for (HStoreFile hStoreFile : hStore.getStorefiles()) { String hFileName = hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName(); - OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); - if (!maxTimestamp.isPresent()) { - LOG.warn("maxTimestamp missing for file: {}", - hStoreFile.getFileInfo().getActiveFileName()); - continue; - } + long maxTimeStamp = dataTieringType.getInstance().getTimestamp(hStoreFile); + LOG.debug("Max TS for file {} is {}", hFileName, new Date(maxTimeStamp)); long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime(); - long fileAge = currentTimestamp - maxTimestamp.getAsLong(); + long fileAge = currentTimestamp - maxTimeStamp; if (fileAge > hotDataAge) { // Values do not matter. coldFiles.put(hFileName, null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java index ee54576a6487..83da5f54e43f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DataTieringType.java @@ -21,6 +21,17 @@ @InterfaceAudience.Public public enum DataTieringType { - NONE, - TIME_RANGE + NONE(null), + TIME_RANGE(new CellTSTiering()), + CUSTOM(new CustomTiering()); + + private final DataTiering instance; + + DataTieringType(DataTiering instance) { + this.instance = instance; + } + + public DataTiering getInstance() { + return instance; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java index b800178e8a28..e01f062a0191 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; +import java.util.function.Function; import org.apache.hadoop.hbase.ExtendedCell; import org.apache.yetus.audience.InterfaceAudience; @@ -33,12 +34,14 @@ @InterfaceAudience.Private public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { - private final NavigableMap lowerBoundary2Writer = new TreeMap<>(); + protected final NavigableMap lowerBoundary2Writer = new TreeMap<>(); private final boolean needEmptyFile; private final Map lowerBoundariesPolicies; + protected Function tieringFunction; + /** * @param lowerBoundariesPolicies each window to storage policy map. * @param needEmptyFile whether need to create an empty store file if we haven't written @@ -46,16 +49,29 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter { */ public DateTieredMultiFileWriter(List lowerBoundaries, Map lowerBoundariesPolicies, boolean needEmptyFile) { + this(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, c -> c.getTimestamp()); + } + + /** + * @param lowerBoundariesPolicies each window to storage policy map. + * @param needEmptyFile whether need to create an empty store file if we haven't written + * out anything. + */ + public DateTieredMultiFileWriter(List lowerBoundaries, + Map lowerBoundariesPolicies, boolean needEmptyFile, + Function tieringFunction) { for (Long lowerBoundary : lowerBoundaries) { lowerBoundary2Writer.put(lowerBoundary, null); } this.needEmptyFile = needEmptyFile; this.lowerBoundariesPolicies = lowerBoundariesPolicies; + this.tieringFunction = tieringFunction; } @Override public void append(ExtendedCell cell) throws IOException { - Map.Entry entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp()); + Map.Entry entry = + lowerBoundary2Writer.floorEntry(tieringFunction.apply(cell)); StoreFileWriter writer = entry.getValue(); if (writer == null) { String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java index 26437ab11242..dc13f190afaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; + import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -29,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; /** @@ -44,6 +47,18 @@ public class DateTieredStoreEngine extends StoreEngine filesCompacting) { return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting); @@ -57,7 +72,7 @@ public CompactionContext createCompaction() throws IOException { @Override protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) throws IOException { - this.compactionPolicy = new DateTieredCompactionPolicy(conf, store); + createCompactionPolicy(conf, store); this.storeFileManager = new DefaultStoreFileManager(kvComparator, StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, conf, compactionPolicy.getConf()); this.storeFlusher = new DefaultStoreFlusher(conf, store); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 569b9d3faa67..b5732d3b23ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -255,6 +255,14 @@ public void appendTrackedTimestampsToMetadata() throws IOException { } } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + liveFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); + if (historicalFileWriter != null) { + historicalFileWriter.appendCustomCellTimestampsToMetadata(timeRangeTracker); + } + } + @Override public void beforeShipped() throws IOException { liveFileWriter.beforeShipped(); @@ -663,6 +671,11 @@ private void appendTrackedTimestampsToMetadata() throws IOException { writer.appendTrackedTimestampsToMetadata(); } + public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) + throws IOException { + writer.appendCustomCellTimestampsToMetadata(timeRangeTracker); + } + private void appendGeneralBloomfilter(final ExtendedCell cell) throws IOException { if (this.generalBloomFilterWriter != null) { /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 055ad85e5a39..069968294b84 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -403,6 +403,11 @@ protected abstract List commitWriter(T writer, FileDetails fd, protected abstract void abortWriter(T writer) throws IOException; + protected List decorateCells(List cells) { + // no op + return cells; + } + /** * Performs the compaction. * @param fd FileDetails of cell sink writer @@ -459,6 +464,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel // output to writer: Cell lastCleanCell = null; long lastCleanCellSeqId = 0; + cells = decorateCells(cells); for (ExtendedCell c : cells) { if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) { lastCleanCell = c; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java new file mode 100644 index 000000000000..f908b31e4ae5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieredUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.StoreEngine.STORE_ENGINE_CLASS_KEY; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER; + +import java.io.IOException; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomCellTieredUtils { + private CustomCellTieredUtils() { + // Utility class, no instantiation + } + + public static void checkForModifyTable(TableDescriptor newTable) throws IOException { + for (ColumnFamilyDescriptor descriptor : newTable.getColumnFamilies()) { + String storeEngineClassName = descriptor.getConfigurationValue(STORE_ENGINE_CLASS_KEY); + if ( + storeEngineClassName != null && storeEngineClassName.contains("CustomCellTieredStoreEngine") + ) { + if (descriptor.getConfigurationValue(TIERING_CELL_QUALIFIER) == null) { + throw new DoNotRetryIOException("StoreEngine " + storeEngineClassName + + " is missing required " + TIERING_CELL_QUALIFIER + " parameter."); + } + } + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java new file mode 100644 index 000000000000..fca76bae8f83 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomCellTieringValueProvider.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * An extension of DateTieredCompactor, overriding the decorateCells method to allow for custom + * values to be used for the different file tiers during compaction. + */ +@InterfaceAudience.Private +public class CustomCellTieringValueProvider implements CustomTieredCompactor.TieringValueProvider { + public static final String TIERING_CELL_QUALIFIER = "TIERING_CELL_QUALIFIER"; + private byte[] tieringQualifier; + + @Override + public void init(Configuration conf) throws Exception { + tieringQualifier = Bytes.toBytes(conf.get(TIERING_CELL_QUALIFIER)); + } + + @Override + public List decorateCells(List cells) { + // if no tiering qualifier properly set, skips the whole flow + if (tieringQualifier != null) { + byte[] tieringValue = null; + // first iterates through the cells within a row, to find the tiering value for the row + for (Cell cell : cells) { + if (CellUtil.matchingQualifier(cell, tieringQualifier)) { + tieringValue = CellUtil.cloneValue(cell); + break; + } + } + if (tieringValue == null) { + tieringValue = Bytes.toBytes(Long.MAX_VALUE); + } + // now apply the tiering value as a tag to all cells within the row + Tag tieringValueTag = new ArrayBackedTag(TagType.CELL_VALUE_TIERING_TAG_TYPE, tieringValue); + List newCells = new ArrayList<>(cells.size()); + for (ExtendedCell cell : cells) { + List tags = PrivateCellUtil.getTags(cell); + tags.add(tieringValueTag); + newCells.add(PrivateCellUtil.createCell(cell, tags)); + } + return newCells; + } else { + return cells; + } + } + + @Override + public long getTieringValue(ExtendedCell cell) { + Optional tagOptional = PrivateCellUtil.getTag(cell, TagType.CELL_VALUE_TIERING_TAG_TYPE); + if (tagOptional.isPresent()) { + Tag tag = tagOptional.get(); + return Bytes.toLong(tag.getValueByteBuffer().array(), tag.getValueOffset(), + tag.getValueLength()); + } + return Long.MAX_VALUE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java new file mode 100644 index 000000000000..dcc97c63d024 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomDateTieredCompactionPolicy.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Custom implementation of DateTieredCompactionPolicy that calculates compaction boundaries based + * on the hbase.hstore.compaction.date.tiered.custom.age.limit.millis configuration property + * and the TIERING_CELL_MIN/TIERING_CELL_MAX stored on metadata of each store file. This policy + * would produce either one or two tiers: - One tier if either all files data age are older than the + * configured age limit or all files data age are younger than the configured age limit. - Two tiers + * if files have both younger and older data than the configured age limit. + */ +@InterfaceAudience.Private +public class CustomDateTieredCompactionPolicy extends DateTieredCompactionPolicy { + + public static final String AGE_LIMIT_MILLIS = + "hbase.hstore.compaction.date.tiered.custom.age.limit.millis"; + + // Defaults to 10 years + public static final long DEFAULT_AGE_LIMIT_MILLIS = + (long) (10L * 365.25 * 24L * 60L * 60L * 1000L); + + private static final Logger LOG = LoggerFactory.getLogger(CustomDateTieredCompactionPolicy.class); + + private long cutOffTimestamp; + + public CustomDateTieredCompactionPolicy(Configuration conf, + StoreConfigInformation storeConfigInfo) throws IOException { + super(conf, storeConfigInfo); + cutOffTimestamp = EnvironmentEdgeManager.currentTime() + - conf.getLong(AGE_LIMIT_MILLIS, DEFAULT_AGE_LIMIT_MILLIS); + + } + + @Override + protected List getCompactBoundariesForMajor(Collection filesToCompact, + long now) { + MutableLong min = new MutableLong(Long.MAX_VALUE); + MutableLong max = new MutableLong(0); + filesToCompact.forEach(f -> { + byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + long minCurrent = Long.MAX_VALUE; + long maxCurrent = 0; + if (timeRangeBytes != null) { + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); + timeRangeTracker.getMin(); + minCurrent = timeRangeTracker.getMin(); + maxCurrent = timeRangeTracker.getMax(); + } catch (IOException e) { + LOG.warn("Got TIERING_CELL_TIME_RANGE info from file, but failed to parse it:", e); + } + } + if (minCurrent < min.getValue()) { + min.setValue(minCurrent); + } + if (maxCurrent > max.getValue()) { + max.setValue(maxCurrent); + } + }); + + List boundaries = new ArrayList<>(); + boundaries.add(Long.MIN_VALUE); + if (min.getValue() < cutOffTimestamp) { + boundaries.add(min.getValue()); + if (max.getValue() > cutOffTimestamp) { + boundaries.add(cutOffTimestamp); + } + } + return boundaries; + } + + @Override + public CompactionRequestImpl selectMinorCompaction(ArrayList candidateSelection, + boolean mayUseOffPeak, boolean mayBeStuck) throws IOException { + ArrayList filteredByPolicy = this.compactionPolicyPerWindow + .applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck); + return selectMajorCompaction(filteredByPolicy); + } + + @Override + public boolean shouldPerformMajorCompaction(Collection filesToCompact) + throws IOException { + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTime(); + if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + int countLower = 0; + int countHigher = 0; + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + for (HStoreFile f : filesToCompact) { + if (checkForTtl(cfTTL, f)) { + return true; + } + if (isMajorOrBulkloadResult(f, now - lowTimestamp)) { + return true; + } + byte[] timeRangeBytes = f.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(timeRangeBytes); + if (timeRangeTracker.getMin() < cutOffTimestamp) { + if (timeRangeTracker.getMax() > cutOffTimestamp) { + // Found at least one file crossing the cutOffTimestamp + return true; + } else { + countLower++; + } + } else { + countHigher++; + } + hdfsBlocksDistribution.add(f.getHDFSBlockDistribution()); + } + // If we haven't found any files crossing the cutOffTimestamp, we have to check + // if there are at least more than one file on each tier and if so, perform compaction + if (countLower > 1 || countHigher > 1) { + return true; + } + return checkBlockLocality(hdfsBlocksDistribution); + } + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java new file mode 100644 index 000000000000..47e4e142bdad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CustomTieredCompactor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class CustomTieredCompactor extends DateTieredCompactor { + + public static final String TIERING_VALUE_PROVIDER = + "hbase.hstore.custom-tiering-value.provider.class"; + private TieringValueProvider tieringValueProvider; + + public CustomTieredCompactor(Configuration conf, HStore store) throws IOException { + super(conf, store); + String className = + conf.get(TIERING_VALUE_PROVIDER, CustomCellTieringValueProvider.class.getName()); + try { + tieringValueProvider = + (TieringValueProvider) Class.forName(className).getConstructor().newInstance(); + tieringValueProvider.init(conf); + } catch (Exception e) { + throw new IOException("Unable to load configured tiering value provider '" + className + "'", + e); + } + } + + @Override + protected List decorateCells(List cells) { + return tieringValueProvider.decorateCells(cells); + } + + @Override + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List lowerBoundaries, final Map lowerBoundariesPolicies) { + return new CustomTieringMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies, + needEmptyFile(request), CustomTieredCompactor.this.tieringValueProvider::getTieringValue); + } + + public interface TieringValueProvider { + + void init(Configuration configuration) throws Exception; + + default List decorateCells(List cells) { + return cells; + } + + long getTieringValue(ExtendedCell cell); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java index 9dbe9aae9cf2..2cce0d67d772 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactionPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -66,7 +67,7 @@ public class DateTieredCompactionPolicy extends SortedCompactionPolicy { private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class); - private final RatioBasedCompactionPolicy compactionPolicyPerWindow; + protected final RatioBasedCompactionPolicy compactionPolicyPerWindow; private final CompactionWindowFactory windowFactory; @@ -108,9 +109,8 @@ public boolean needsCompaction(Collection storeFiles, } } - @Override - public boolean shouldPerformMajorCompaction(Collection filesToCompact) - throws IOException { + protected boolean isMajorCompactionTime(Collection filesToCompact, long now, + long lowestModificationTime) throws IOException { long mcTime = getNextMajorCompactTime(filesToCompact); if (filesToCompact == null || mcTime == 0) { if (LOG.isDebugEnabled()) { @@ -118,58 +118,40 @@ public boolean shouldPerformMajorCompaction(Collection filesToCompac } return false; } - // TODO: Use better method for determining stamp of last major (HBASE-2990) - long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); - long now = EnvironmentEdgeManager.currentTime(); - if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) { + if (lowestModificationTime <= 0L || lowestModificationTime >= (now - mcTime)) { if (LOG.isDebugEnabled()) { - LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " - + now + " mcTime: " + mcTime); + LOG.debug("lowTimestamp: " + lowestModificationTime + " lowTimestamp: " + + lowestModificationTime + " now: " + now + " mcTime: " + mcTime); } return false; } + return true; + } - long cfTTL = this.storeConfigInfo.getStoreFileTtl(); - HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); - List boundaries = getCompactBoundariesForMajor(filesToCompact, now); - boolean[] filesInWindow = new boolean[boundaries.size()]; - - for (HStoreFile file : filesToCompact) { - OptionalLong minTimestamp = file.getMinimumTimestamp(); - long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE; - if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) { - LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); - return true; - } - if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { - LOG.debug("Major compaction triggered on store " + this - + ", because there are new files and time since last major compaction " - + (now - lowTimestamp) + "ms"); - return true; - } + protected boolean checkForTtl(long ttl, HStoreFile file) { + OptionalLong minTimestamp = file.getMinimumTimestamp(); + long oldest = minTimestamp.isPresent() + ? EnvironmentEdgeManager.currentTime() - minTimestamp.getAsLong() + : Long.MIN_VALUE; + if (ttl != Long.MAX_VALUE && oldest >= ttl) { + LOG.debug("Major compaction triggered on store " + this + "; for TTL maintenance"); + return true; + } + return false; + } - int lowerWindowIndex = - Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); - int upperWindowIndex = - Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); - // Handle boundary conditions and negative values of binarySearch - lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; - upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; - if (lowerWindowIndex != upperWindowIndex) { - LOG.debug("Major compaction triggered on store " + this + "; because file " + file.getPath() - + " has data with timestamps cross window boundaries"); - return true; - } else if (filesInWindow[upperWindowIndex]) { - LOG.debug("Major compaction triggered on store " + this - + "; because there are more than one file in some windows"); - return true; - } else { - filesInWindow[upperWindowIndex] = true; - } - hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); + protected boolean isMajorOrBulkloadResult(HStoreFile file, long timeDiff) { + if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) { + LOG.debug("Major compaction triggered on store " + this + + ", because there are new files and time since last major compaction " + timeDiff + "ms"); + return true; } + return false; + } + protected boolean checkBlockLocality(HDFSBlocksDistribution hdfsBlocksDistribution) + throws UnknownHostException { float blockLocalityIndex = hdfsBlocksDistribution .getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER)); if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { @@ -178,9 +160,55 @@ public boolean shouldPerformMajorCompaction(Collection filesToCompac + " (min " + comConf.getMinLocalityToForceCompact() + ")"); return true; } + return false; + } - LOG.debug( - "Skipping major compaction of " + this + ", because the files are already major compacted"); + @Override + public boolean shouldPerformMajorCompaction(Collection filesToCompact) + throws IOException { + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = EnvironmentEdgeManager.currentTime(); + if (isMajorCompactionTime(filesToCompact, now, lowTimestamp)) { + long cfTTL = this.storeConfigInfo.getStoreFileTtl(); + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + List boundaries = getCompactBoundariesForMajor(filesToCompact, now); + boolean[] filesInWindow = new boolean[boundaries.size()]; + for (HStoreFile file : filesToCompact) { + OptionalLong minTimestamp = file.getMinimumTimestamp(); + if (checkForTtl(cfTTL, file)) { + return true; + } + if (isMajorOrBulkloadResult(file, now - lowTimestamp)) { + return true; + } + int lowerWindowIndex = + Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE)); + int upperWindowIndex = + Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE)); + // Handle boundary conditions and negative values of binarySearch + lowerWindowIndex = + (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex; + upperWindowIndex = + (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex; + if (lowerWindowIndex != upperWindowIndex) { + LOG.debug("Major compaction triggered on store " + this + "; because file " + + file.getPath() + " has data with timestamps cross window boundaries"); + return true; + } else if (filesInWindow[upperWindowIndex]) { + LOG.debug("Major compaction triggered on store " + this + + "; because there are more than one file in some windows"); + return true; + } else { + filesInWindow[upperWindowIndex] = true; + } + hdfsBlocksDistribution.add(file.getHDFSBlockDistribution()); + } + if (checkBlockLocality(hdfsBlocksDistribution)) { + return true; + } + LOG.debug( + "Skipping major compaction of " + this + ", because the files are already major compacted"); + } return false; } @@ -296,7 +324,8 @@ private DateTieredCompactionRequest generateCompactionRequest(ArrayList getCompactBoundariesForMajor(Collection filesToCompact, long now) { + protected List getCompactBoundariesForMajor(Collection filesToCompact, + long now) { long minTimestamp = filesToCompact.stream() .mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min().orElse(Long.MAX_VALUE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java index b5911b0cec46..9cef2ebc3144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java @@ -46,7 +46,7 @@ public DateTieredCompactor(Configuration conf, HStore store) { super(conf, store); } - private boolean needEmptyFile(CompactionRequestImpl request) { + protected boolean needEmptyFile(CompactionRequestImpl request) { // if we are going to compact the last N files, then we need to emit an empty file to retain the // maxSeqId if we haven't written out anything. OptionalLong maxSeqId = StoreUtils.getMaxSequenceIdInList(request.getFiles()); @@ -70,14 +70,20 @@ public List compact(final CompactionRequestImpl request, final List public DateTieredMultiFileWriter createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major, Consumer writerCreationTracker) throws IOException { - DateTieredMultiFileWriter writer = new DateTieredMultiFileWriter(lowerBoundaries, - lowerBoundariesPolicies, needEmptyFile(request)); + DateTieredMultiFileWriter writer = + createMultiWriter(request, lowerBoundaries, lowerBoundariesPolicies); initMultiWriter(writer, scanner, fd, shouldDropBehind, major, writerCreationTracker); return writer; } }, throughputController, user); } + protected DateTieredMultiFileWriter createMultiWriter(final CompactionRequestImpl request, + final List lowerBoundaries, final Map lowerBoundariesPolicies) { + return new DateTieredMultiFileWriter(lowerBoundaries, lowerBoundariesPolicies, + needEmptyFile(request), c -> c.getTimestamp()); + } + @Override protected List commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, CompactionRequestImpl request) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java index f031a96d15f4..153ad50419b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileInlineToRootChunkConversion.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -85,7 +86,7 @@ public void testWriteHFile() throws Exception { hfw.append(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(k) .setFamily(HConstants.EMPTY_BYTE_ARRAY).setQualifier(HConstants.EMPTY_BYTE_ARRAY) .setTimestamp(HConstants.LATEST_TIMESTAMP).setType(KeyValue.Type.Maximum.getCode()) - .setValue(v).build()); + .setValue(v).setType(Cell.Type.Put).build()); } hfw.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java new file mode 100644 index 000000000000..b01717dfa1f8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellDataTieringManager.java @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to test the functionality of the DataTieringManager. + * + * The mock online regions are stored in {@link TestCustomCellDataTieringManager#testOnlineRegions}. + * For all tests, the setup of + * {@link TestCustomCellDataTieringManager#testOnlineRegions} occurs only once. + * Please refer to {@link TestCustomCellDataTieringManager#setupOnlineRegions()} for the structure. + * Additionally, a list of all store files is + * maintained in {@link TestCustomCellDataTieringManager#hStoreFiles}. + * The characteristics of these store files are listed below: + * @formatter:off + * ## HStoreFile Information + * | HStoreFile | Region | Store | DataTiering | isHot | + * |------------------|--------------------|---------------------|-----------------------|-------| + * | hStoreFile0 | region1 | hStore11 | CUSTOM_CELL_VALUE | true | + * | hStoreFile1 | region1 | hStore12 | NONE | true | + * | hStoreFile2 | region2 | hStore21 | CUSTOM_CELL_VALUE | true | + * | hStoreFile3 | region2 | hStore22 | CUSTOM_CELL_VALUE | false | + * @formatter:on + */ + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellDataTieringManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellDataTieringManager.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestCustomCellDataTieringManager.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final long DAY = 24 * 60 * 60 * 1000; + private static Configuration defaultConf; + private static FileSystem fs; + private BlockCache blockCache; + private static CacheConfig cacheConf; + private static Path testDir; + private static final Map testOnlineRegions = new HashMap<>(); + + private static DataTieringManager dataTieringManager; + private static final List hStoreFiles = new ArrayList<>(); + + /** + * Represents the current lexicographically increasing string used as a row key when writing + * HFiles. It is incremented each time {@link #nextString()} is called to generate unique row + * keys. + */ + private static String rowKeyString; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + testDir = TEST_UTIL.getDataTestDir(TestCustomCellDataTieringManager.class.getSimpleName()); + defaultConf = TEST_UTIL.getConfiguration(); + updateCommonConfigurations(); + DataTieringManager.instantiate(defaultConf, testOnlineRegions); + dataTieringManager = DataTieringManager.getInstance(); + rowKeyString = ""; + } + + private static void updateCommonConfigurations() { + defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true); + defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); + defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32); + } + + @FunctionalInterface + interface DataTieringMethodCallerWithPath { + boolean call(DataTieringManager manager, Path path) throws DataTieringException; + } + + @FunctionalInterface + interface DataTieringMethodCallerWithKey { + boolean call(DataTieringManager manager, BlockCacheKey key) throws DataTieringException; + } + + @Test + public void testDataTieringEnabledWithKey() throws IOException { + initializeTestEnvironment(); + DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isDataTieringEnabled; + + // Test with valid key + BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); + testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true); + + // Test with another valid key + key = new BlockCacheKey(hStoreFiles.get(1).getPath(), 0, true, BlockType.DATA); + testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false); + + // Test with valid key with no HFile Path + key = new BlockCacheKey(hStoreFiles.get(0).getPath().getName(), 0); + testDataTieringMethodWithKeyExpectingException(methodCallerWithKey, key, + new DataTieringException("BlockCacheKey Doesn't Contain HFile Path")); + } + + @Test + public void testDataTieringEnabledWithPath() throws IOException { + initializeTestEnvironment(); + DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isDataTieringEnabled; + + // Test with valid path + Path hFilePath = hStoreFiles.get(1).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false); + + // Test with another valid path + hFilePath = hStoreFiles.get(3).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + + // Test with an incorrect path + hFilePath = new Path("incorrectPath"); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("Incorrect HFile Path: " + hFilePath)); + + // Test with a non-existing HRegion path + Path basePath = hStoreFiles.get(0).getPath().getParent().getParent().getParent(); + hFilePath = new Path(basePath, "incorrectRegion/cf1/filename"); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("HRegion corresponding to " + hFilePath + " doesn't exist")); + + // Test with a non-existing HStore path + basePath = hStoreFiles.get(0).getPath().getParent().getParent(); + hFilePath = new Path(basePath, "incorrectCf/filename"); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("HStore corresponding to " + hFilePath + " doesn't exist")); + } + + @Test + public void testHotDataWithKey() throws IOException { + initializeTestEnvironment(); + DataTieringMethodCallerWithKey methodCallerWithKey = DataTieringManager::isHotData; + + // Test with valid key + BlockCacheKey key = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); + testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, true); + + // Test with another valid key + key = new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA); + testDataTieringMethodWithKeyNoException(methodCallerWithKey, key, false); + } + + @Test + public void testHotDataWithPath() throws IOException { + initializeTestEnvironment(); + DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isHotData; + + // Test with valid path + Path hFilePath = hStoreFiles.get(2).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + + // Test with another valid path + hFilePath = hStoreFiles.get(3).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false); + + // Test with a filename where corresponding HStoreFile in not present + hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), "incorrectFileName"); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("Store file corresponding to " + hFilePath + " doesn't exist")); + } + + @Test + public void testPrefetchWhenDataTieringEnabled() throws IOException { + setPrefetchBlocksOnOpen(); + this.blockCache = initializeTestEnvironment(); + // Evict blocks from cache by closing the files and passing evict on close. + // Then initialize the reader again. Since Prefetch on open is set to true, it should prefetch + // those blocks. + for (HStoreFile file : hStoreFiles) { + file.closeStoreFile(true); + file.initReader(); + } + + // Since we have one cold file among four files, only three should get prefetched. + Optional>> fullyCachedFiles = blockCache.getFullyCachedFiles(); + assertTrue("We should get the fully cached files from the cache", fullyCachedFiles.isPresent()); + Waiter.waitFor(defaultConf, 10000, () -> fullyCachedFiles.get().size() == 3); + assertEquals("Number of fully cached files are incorrect", 3, fullyCachedFiles.get().size()); + } + + private void setPrefetchBlocksOnOpen() { + defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true); + } + + @Test + public void testColdDataFiles() throws IOException { + initializeTestEnvironment(); + Set allCachedBlocks = new HashSet<>(); + for (HStoreFile file : hStoreFiles) { + allCachedBlocks.add(new BlockCacheKey(file.getPath(), 0, true, BlockType.DATA)); + } + + // Verify hStoreFile3 is identified as cold data + DataTieringMethodCallerWithPath methodCallerWithPath = DataTieringManager::isHotData; + Path hFilePath = hStoreFiles.get(3).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, false); + + // Verify all the other files in hStoreFiles are hot data + for (int i = 0; i < hStoreFiles.size() - 1; i++) { + hFilePath = hStoreFiles.get(i).getPath(); + testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + } + + try { + Set coldFilePaths = dataTieringManager.getColdDataFiles(allCachedBlocks); + assertEquals(1, coldFilePaths.size()); + } catch (DataTieringException e) { + fail("Unexpected DataTieringException: " + e.getMessage()); + } + } + + @Test + public void testCacheCompactedBlocksOnWriteDataTieringDisabled() throws IOException { + setCacheCompactBlocksOnWrite(); + this.blockCache = initializeTestEnvironment(); + HRegion region = createHRegion("table3", this.blockCache); + testCacheCompactedBlocksOnWrite(region, true); + } + + @Test + public void testCacheCompactedBlocksOnWriteWithHotData() throws IOException { + setCacheCompactBlocksOnWrite(); + this.blockCache = initializeTestEnvironment(); + HRegion region = + createHRegion("table3", getConfWithCustomCellDataTieringEnabled(5 * DAY), this.blockCache); + testCacheCompactedBlocksOnWrite(region, true); + } + + @Test + public void testCacheCompactedBlocksOnWriteWithColdData() throws IOException { + setCacheCompactBlocksOnWrite(); + this.blockCache = initializeTestEnvironment(); + HRegion region = + createHRegion("table3", getConfWithCustomCellDataTieringEnabled(DAY), this.blockCache); + testCacheCompactedBlocksOnWrite(region, false); + } + + private void setCacheCompactBlocksOnWrite() { + defaultConf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, true); + } + + private void testCacheCompactedBlocksOnWrite(HRegion region, boolean expectDataBlocksCached) + throws IOException { + HStore hStore = createHStore(region, "cf1"); + createTestFilesForCompaction(hStore); + hStore.refreshStoreFiles(); + + region.stores.put(Bytes.toBytes("cf1"), hStore); + testOnlineRegions.put(region.getRegionInfo().getEncodedName(), region); + + long initialStoreFilesCount = hStore.getStorefilesCount(); + long initialCacheDataBlockCount = blockCache.getDataBlockCount(); + assertEquals(3, initialStoreFilesCount); + assertEquals(0, initialCacheDataBlockCount); + + region.compact(true); + + long compactedStoreFilesCount = hStore.getStorefilesCount(); + long compactedCacheDataBlockCount = blockCache.getDataBlockCount(); + assertEquals(1, compactedStoreFilesCount); + assertEquals(expectDataBlocksCached, compactedCacheDataBlockCount > 0); + } + + private void createTestFilesForCompaction(HStore hStore) throws IOException { + long currentTime = System.currentTimeMillis(); + Path storeDir = hStore.getStoreContext().getFamilyStoreDirectoryPath(); + Configuration configuration = hStore.getReadOnlyConfiguration(); + + HRegionFileSystem regionFS = hStore.getHRegion().getRegionFileSystem(); + + createHStoreFile(storeDir, configuration, currentTime - 2 * DAY, regionFS); + createHStoreFile(storeDir, configuration, currentTime - 3 * DAY, regionFS); + createHStoreFile(storeDir, configuration, currentTime - 4 * DAY, regionFS); + } + + @Test + public void testPickColdDataFiles() throws IOException { + initializeTestEnvironment(); + Map coldDataFiles = dataTieringManager.getColdFilesList(); + assertEquals(1, coldDataFiles.size()); + // hStoreFiles[3] is the cold file. + assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName())); + } + + /* + * Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file + * remains in the cache. + */ + @Test + public void testBlockEvictions() throws Exception { + initializeTestEnvironment(); + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, + writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with cold data files and a block with hot data. + // hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file. + Set cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block into cache with hot data which should trigger the eviction + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 2 hot blocks blocks only. + // Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional + // space. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0); + } + + /* + * Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold + * block remains in the cache since the required space is freed. + */ + @Test + public void testBlockEvictionsAllColdBlocks() throws Exception { + initializeTestEnvironment(); + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, + writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with three cold data blocks. + // hStoreFiles.get(3) is a cold data file. + Set cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block into cache with hot data which should trigger the eviction + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 1 cold block and a newly added hot block. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1); + } + + /* + * Verify that a hot block evicted along with a cold block when bucket reaches its capacity. + */ + @Test + public void testBlockEvictionsHotBlocks() throws Exception { + initializeTestEnvironment(); + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, + writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with two hot data blocks and one cold data block + // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file. + Set cacheKeys = new HashSet<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional block which should evict the only cold block with an additional hot block. + BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket cache now contains 2 hot blocks. + // Only one of the older hot blocks is retained and other one is the newly added hot block. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0); + } + + @Test + public void testFeatureKeyDisabled() throws Exception { + DataTieringManager.resetForTestingOnly(); + defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false); + initializeTestEnvironment(); + + try { + assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions)); + // Verify that the DataaTieringManager instance is not instantiated in the + // instantiate call above. + assertNull(DataTieringManager.getInstance()); + + // Also validate that data temperature is not honoured. + long capacitySize = 40 * 1024; + int writeThreads = 3; + int writerQLen = 64; + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + + // Setup: Create a bucket cache with lower capacity + BucketCache bucketCache = + new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes, + writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf); + + // Create three Cache keys with two hot data blocks and one cold data block + // hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file. + List cacheKeys = new ArrayList<>(); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA)); + cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA)); + + // Create dummy data to be cached and fill the cache completely. + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3); + + int blocksIter = 0; + for (BlockCacheKey key : cacheKeys) { + LOG.info("Adding {}", key); + bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock()); + // Ensure that the block is persisted to the file. + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(key))); + } + + // Verify that the bucket cache contains 3 blocks. + assertEquals(3, bucketCache.getBackingMap().keySet().size()); + + // Add an additional hot block, which triggers eviction. + BlockCacheKey newKey = + new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA); + CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1); + + bucketCache.cacheBlock(newKey, newBlock[0].getBlock()); + Waiter.waitFor(defaultConf, 10000, 100, + () -> (bucketCache.getBackingMap().containsKey(newKey))); + + // Verify that the bucket still contains the only cold block and one newly added hot block. + // The older hot blocks are evicted and data-tiering mechanism does not kick in to evict + // the cold block. + validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1); + } finally { + DataTieringManager.resetForTestingOnly(); + defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true); + assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions)); + } + } + + @Test + public void testCacheConfigShouldCacheFile() throws Exception { + initializeTestEnvironment(); + // Verify that the API shouldCacheFileBlock returns the result correctly. + // hStoreFiles[0], hStoreFiles[1], hStoreFiles[2] are hot files. + // hStoreFiles[3] is a cold file. + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(0).getFileInfo().getHFileInfo(), hStoreFiles.get(0).getFileInfo().getConf())); + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(1).getFileInfo().getHFileInfo(), hStoreFiles.get(1).getFileInfo().getConf())); + assertTrue(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(2).getFileInfo().getHFileInfo(), hStoreFiles.get(2).getFileInfo().getConf())); + assertFalse(cacheConf.shouldCacheBlockOnRead(BlockCategory.DATA, + hStoreFiles.get(3).getFileInfo().getHFileInfo(), hStoreFiles.get(3).getFileInfo().getConf())); + } + + @Test + public void testCacheOnReadColdFile() throws Exception { + this.blockCache = initializeTestEnvironment(); + // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call. + HStoreFile hStoreFile = hStoreFiles.get(3); + BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA); + testCacheOnRead(hStoreFile, cacheKey, -1, false); + } + + @Test + public void testCacheOnReadHotFile() throws Exception { + this.blockCache = initializeTestEnvironment(); + // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call. + HStoreFile hStoreFile = hStoreFiles.get(0); + BlockCacheKey cacheKey = + new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); + testCacheOnRead(hStoreFile, cacheKey, -1, true); + } + + private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize, + boolean expectedCached) throws Exception { + // Execute the read block API which will try to cache the block if the block is a hot block. + hStoreFile.getReader().getHFileReader().readBlock(key.getOffset(), onDiskBlockSize, true, false, + false, false, key.getBlockType(), DataBlockEncoding.NONE); + // Validate that the hot block gets cached and cold block is not cached. + HFileBlock block = (HFileBlock) blockCache.getBlock(key, false, false, false); + if (expectedCached) { + assertNotNull(block); + } else { + assertNull(block); + } + } + + private void validateBlocks(Set keys, int expectedTotalKeys, int expectedHotBlocks, + int expectedColdBlocks) { + int numHotBlocks = 0, numColdBlocks = 0; + + Waiter.waitFor(defaultConf, 10000, 100, () -> (expectedTotalKeys == keys.size())); + int iter = 0; + for (BlockCacheKey key : keys) { + try { + if (dataTieringManager.isHotData(key)) { + numHotBlocks++; + } else { + numColdBlocks++; + } + } catch (Exception e) { + LOG.debug("Error validating priority for key {}", key, e); + fail(e.getMessage()); + } + } + assertEquals(expectedHotBlocks, numHotBlocks); + assertEquals(expectedColdBlocks, numColdBlocks); + } + + private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path, + boolean expectedResult, DataTieringException exception) { + try { + boolean value = caller.call(dataTieringManager, path); + if (exception != null) { + fail("Expected DataTieringException to be thrown"); + } + assertEquals(expectedResult, value); + } catch (DataTieringException e) { + if (exception == null) { + fail("Unexpected DataTieringException: " + e.getMessage()); + } + assertEquals(exception.getMessage(), e.getMessage()); + } + } + + private void testDataTieringMethodWithKey(DataTieringMethodCallerWithKey caller, + BlockCacheKey key, boolean expectedResult, DataTieringException exception) { + try { + boolean value = caller.call(dataTieringManager, key); + if (exception != null) { + fail("Expected DataTieringException to be thrown"); + } + assertEquals(expectedResult, value); + } catch (DataTieringException e) { + if (exception == null) { + fail("Unexpected DataTieringException: " + e.getMessage()); + } + assertEquals(exception.getMessage(), e.getMessage()); + } + } + + private void testDataTieringMethodWithPathExpectingException( + DataTieringMethodCallerWithPath caller, Path path, DataTieringException exception) { + testDataTieringMethodWithPath(caller, path, false, exception); + } + + private void testDataTieringMethodWithPathNoException(DataTieringMethodCallerWithPath caller, + Path path, boolean expectedResult) { + testDataTieringMethodWithPath(caller, path, expectedResult, null); + } + + private void testDataTieringMethodWithKeyExpectingException(DataTieringMethodCallerWithKey caller, + BlockCacheKey key, DataTieringException exception) { + testDataTieringMethodWithKey(caller, key, false, exception); + } + + private void testDataTieringMethodWithKeyNoException(DataTieringMethodCallerWithKey caller, + BlockCacheKey key, boolean expectedResult) { + testDataTieringMethodWithKey(caller, key, expectedResult, null); + } + + private static BlockCache initializeTestEnvironment() throws IOException { + BlockCache blockCache = setupFileSystemAndCache(); + setupOnlineRegions(blockCache); + return blockCache; + } + + private static BlockCache setupFileSystemAndCache() throws IOException { + fs = HFileSystem.get(defaultConf); + BlockCache blockCache = BlockCacheFactory.createBlockCache(defaultConf); + cacheConf = new CacheConfig(defaultConf, blockCache); + return blockCache; + } + + private static void setupOnlineRegions(BlockCache blockCache) throws IOException { + testOnlineRegions.clear(); + hStoreFiles.clear(); + long day = 24 * 60 * 60 * 1000; + long currentTime = System.currentTimeMillis(); + + HRegion region1 = createHRegion("table1", blockCache); + + HStore hStore11 = createHStore(region1, "cf1", getConfWithCustomCellDataTieringEnabled(day)); + hStoreFiles.add(createHStoreFile(hStore11.getStoreContext().getFamilyStoreDirectoryPath(), + hStore11.getReadOnlyConfiguration(), currentTime, region1.getRegionFileSystem())); + hStore11.refreshStoreFiles(); + HStore hStore12 = createHStore(region1, "cf2"); + hStoreFiles.add(createHStoreFile(hStore12.getStoreContext().getFamilyStoreDirectoryPath(), + hStore12.getReadOnlyConfiguration(), currentTime - day, region1.getRegionFileSystem())); + hStore12.refreshStoreFiles(); + + region1.stores.put(Bytes.toBytes("cf1"), hStore11); + region1.stores.put(Bytes.toBytes("cf2"), hStore12); + + HRegion region2 = createHRegion("table2", + getConfWithCustomCellDataTieringEnabled((long) (2.5 * day)), blockCache); + + HStore hStore21 = createHStore(region2, "cf1"); + hStoreFiles.add(createHStoreFile(hStore21.getStoreContext().getFamilyStoreDirectoryPath(), + hStore21.getReadOnlyConfiguration(), currentTime - 2 * day, region2.getRegionFileSystem())); + hStore21.refreshStoreFiles(); + HStore hStore22 = createHStore(region2, "cf2"); + hStoreFiles.add(createHStoreFile(hStore22.getStoreContext().getFamilyStoreDirectoryPath(), + hStore22.getReadOnlyConfiguration(), currentTime - 3 * day, region2.getRegionFileSystem())); + hStore22.refreshStoreFiles(); + + region2.stores.put(Bytes.toBytes("cf1"), hStore21); + region2.stores.put(Bytes.toBytes("cf2"), hStore22); + + for (HStoreFile file : hStoreFiles) { + file.initReader(); + } + + testOnlineRegions.put(region1.getRegionInfo().getEncodedName(), region1); + testOnlineRegions.put(region2.getRegionInfo().getEncodedName(), region2); + } + + private static HRegion createHRegion(String table, BlockCache blockCache) throws IOException { + return createHRegion(table, defaultConf, blockCache); + } + + private static HRegion createHRegion(String table, Configuration conf, BlockCache blockCache) + throws IOException { + TableName tableName = TableName.valueOf(table); + + TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) + .setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY)) + .setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, + conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY)) + .build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build(); + + Configuration testConf = new Configuration(conf); + CommonFSUtils.setRootDir(testConf, testDir); + HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs, + CommonFSUtils.getTableDir(testDir, hri.getTable()), hri); + + HRegion region = new HRegion(regionFs, null, conf, htd, null); + // Manually sets the BlockCache for the HRegion instance. + // This is necessary because the region server is not started within this method, + // and therefore the BlockCache needs to be explicitly configured. + region.setBlockCache(blockCache); + return region; + } + + private static HStore createHStore(HRegion region, String columnFamily) throws IOException { + return createHStore(region, columnFamily, defaultConf); + } + + private static HStore createHStore(HRegion region, String columnFamily, Configuration conf) + throws IOException { + ColumnFamilyDescriptor columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)) + .setValue(DataTieringManager.DATATIERING_KEY, conf.get(DataTieringManager.DATATIERING_KEY)) + .setValue(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, + conf.get(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY)) + .build(); + + return new HStore(region, columnFamilyDescriptor, conf, false); + } + + private static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, + HRegionFileSystem regionFs) throws IOException { + String columnFamily = storeDir.getName(); + + StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) + .withOutputDir(storeDir).withFileContext(new HFileContextBuilder().build()).build(); + + writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), timestamp); + + StoreContext storeContext = StoreContext.getBuilder().withRegionFileSystem(regionFs).build(); + StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, storeContext); + return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true, + sft); + } + + private static Configuration getConfWithCustomCellDataTieringEnabled(long hotDataAge) { + Configuration conf = new Configuration(defaultConf); + conf.set(DataTieringManager.DATATIERING_KEY, DataTieringType.CUSTOM.name()); + conf.set(DataTieringManager.DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(hotDataAge)); + return conf; + } + + /** + * Writes random data to a store file with rows arranged in lexicographically increasing order. + * Each row is generated using the {@link #nextString()} method, ensuring that each subsequent row + * is lexicographically larger than the previous one. + */ + private static void writeStoreFileRandomData(final StoreFileWriter writer, byte[] columnFamily, + long timestamp) throws IOException { + int cellsPerFile = 10; + byte[] qualifier = Bytes.toBytes("qualifier"); + byte[] value = generateRandomBytes(4 * 1024); + try { + for (int i = 0; i < cellsPerFile; i++) { + byte[] row = Bytes.toBytes(nextString()); + writer.append(new KeyValue(row, columnFamily, qualifier, timestamp, value)); + } + } finally { + writer.appendTrackedTimestampsToMetadata(); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(timestamp); + timeRangeTracker.setMax(timestamp); + writer.appendCustomCellTimestampsToMetadata(timeRangeTracker); + writer.close(); + } + } + + private static byte[] generateRandomBytes(int sizeInBytes) { + Random random = new Random(); + byte[] randomBytes = new byte[sizeInBytes]; + random.nextBytes(randomBytes); + return randomBytes; + } + + /** + * Returns the lexicographically larger string every time it's called. + */ + private static String nextString() { + if (rowKeyString == null || rowKeyString.isEmpty()) { + rowKeyString = "a"; + } + char lastChar = rowKeyString.charAt(rowKeyString.length() - 1); + if (lastChar < 'z') { + rowKeyString = rowKeyString.substring(0, rowKeyString.length() - 1) + (char) (lastChar + 1); + } else { + rowKeyString = rowKeyString + "a"; + } + return rowKeyString; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java new file mode 100644 index 000000000000..c89e99197179 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCustomCellTieredCompactionPolicy.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerForTest; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellTieredCompactionPolicy { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellTieredCompactionPolicy.class); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + public static final byte[] FAMILY = Bytes.toBytes("cf"); + + private HStoreFile createFile(Path file, long minValue, long maxValue, long size, int seqId) + throws IOException { + return createFile(mockRegionInfo(), file, minValue, maxValue, size, seqId, 0); + } + + private HStoreFile createFile(RegionInfo regionInfo, Path file, long minValue, long maxValue, + long size, int seqId, long ageInDisk) throws IOException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + HRegionFileSystem regionFileSystem = + new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, file, regionInfo); + StoreContext ctx = new StoreContext.Builder() + .withColumnFamilyDescriptor(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()) + .withRegionFileSystem(regionFileSystem).build(); + StoreFileTrackerForTest sftForTest = + new StoreFileTrackerForTest(TEST_UTIL.getConfiguration(), true, ctx); + MockHStoreFile msf = + new MockHStoreFile(TEST_UTIL, file, size, ageInDisk, false, (long) seqId, sftForTest); + TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); + timeRangeTracker.setMin(minValue); + timeRangeTracker.setMax(maxValue); + msf.setMetadataValue(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); + return msf; + } + + private CustomDateTieredCompactionPolicy mockAndCreatePolicy() throws Exception { + RegionInfo mockedRegionInfo = mockRegionInfo(); + return mockAndCreatePolicy(mockedRegionInfo); + } + + private CustomDateTieredCompactionPolicy mockAndCreatePolicy(RegionInfo regionInfo) + throws Exception { + StoreConfigInformation mockedStoreConfig = mock(StoreConfigInformation.class); + when(mockedStoreConfig.getRegionInfo()).thenReturn(regionInfo); + CustomDateTieredCompactionPolicy policy = + new CustomDateTieredCompactionPolicy(TEST_UTIL.getConfiguration(), mockedStoreConfig); + return policy; + } + + private RegionInfo mockRegionInfo() { + RegionInfo mockedRegionInfo = mock(RegionInfo.class); + when(mockedRegionInfo.getEncodedName()).thenReturn("1234567890987654321"); + return mockedRegionInfo; + } + + private Path preparePath() throws Exception { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path file = + new Path(TEST_UTIL.getDataTestDir(), UUID.randomUUID().toString().replaceAll("-", "")); + fs.create(file); + return file; + } + + @Test + public void testGetCompactBoundariesForMajorNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + assertEquals(1, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorAllOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + // The default cut off age is 10 years, so any of the min/max value there should get in the old + // tier + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, 2, 3, 1024, 1)); + assertEquals(2, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorOneOnEachSide() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + assertEquals(3, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @Test + public void testGetCompactBoundariesForMajorOneCrossing() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, 0, EnvironmentEdgeManager.currentTime(), 1024, 0)); + assertEquals(3, + ((DateTieredCompactionRequest) policy.selectMajorCompaction(files)).getBoundaries().size()); + } + + @FunctionalInterface + interface PolicyValidator { + void accept(T t, U u) throws Exception; + } + + private void testShouldPerformMajorCompaction(long min, long max, int numFiles, + PolicyValidator> validation) + throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + RegionInfo mockedRegionInfo = mockRegionInfo(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(timeMachine); + for (int i = 0; i < numFiles; i++) { + MockHStoreFile mockedSFile = (MockHStoreFile) createFile(mockedRegionInfo, file, min, max, + 1024, 0, HConstants.DEFAULT_MAJOR_COMPACTION_PERIOD); + mockedSFile.setIsMajor(true); + files.add(mockedSFile); + } + EnvironmentEdgeManager.reset(); + validation.accept(policy, files); + } + + @Test + public void testShouldPerformMajorCompactionOneFileCrossing() throws Exception { + long max = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(0, max, 1, + (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionOneFileMinMaxLow() throws Exception { + testShouldPerformMajorCompaction(0, 1, 1, + (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionOneFileMinMaxHigh() throws Exception { + long currentTime = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(currentTime, currentTime, 1, + (p, f) -> assertFalse(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testShouldPerformMajorCompactionTwoFilesMinMaxHigh() throws Exception { + long currentTime = EnvironmentEdgeManager.currentTime(); + testShouldPerformMajorCompaction(currentTime, currentTime, 2, + (p, f) -> assertTrue(p.shouldPerformMajorCompaction(f))); + } + + @Test + public void testSelectMinorCompactionTwoFilesNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + // Shouldn't do minor compaction, as minimum number of files + // for minor compactions is 3 + assertEquals(0, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesNoOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesAllOld() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, 1, 2, 1024, 1)); + files.add(createFile(file, 3, 4, 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesOneOldTwoNew() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } + + @Test + public void testSelectMinorCompactionThreeFilesTwoOldOneNew() throws Exception { + CustomDateTieredCompactionPolicy policy = mockAndCreatePolicy(); + Path file = preparePath(); + ArrayList files = new ArrayList<>(); + files.add(createFile(file, 0, 1, 1024, 0)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 1)); + files.add(createFile(file, EnvironmentEdgeManager.currentTime(), + EnvironmentEdgeManager.currentTime(), 1024, 2)); + assertEquals(3, policy.selectMinorCompaction(files, true, true).getFiles().size()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java index 37e0fe98e7d0..bf82a531f199 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDataTieringManager.java @@ -227,7 +227,8 @@ public void testHotDataWithPath() throws IOException { // Test with a filename where corresponding HStoreFile in not present hFilePath = new Path(hStoreFiles.get(0).getPath().getParent(), "incorrectFileName"); - testDataTieringMethodWithPathNoException(methodCallerWithPath, hFilePath, true); + testDataTieringMethodWithPathExpectingException(methodCallerWithPath, hFilePath, + new DataTieringException("Store file corresponding to " + hFilePath + " doesn't exist")); } @Test @@ -600,19 +601,21 @@ public void testCacheConfigShouldCacheFile() throws Exception { @Test public void testCacheOnReadColdFile() throws Exception { + initializeTestEnvironment(); // hStoreFiles[3] is a cold file. the blocks should not get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(3); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFile.getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, false); + testCacheOnRead(hStoreFile, cacheKey, -1, false); } @Test public void testCacheOnReadHotFile() throws Exception { + initializeTestEnvironment(); // hStoreFiles[0] is a hot file. the blocks should get loaded after a readBlock call. HStoreFile hStoreFile = hStoreFiles.get(0); BlockCacheKey cacheKey = new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA); - testCacheOnRead(hStoreFile, cacheKey, 23025, true); + testCacheOnRead(hStoreFile, cacheKey, -1, true); } private void testCacheOnRead(HStoreFile hStoreFile, BlockCacheKey key, long onDiskBlockSize, @@ -806,8 +809,8 @@ private static Configuration getConfWithTimeRangeDataTieringEnabled(long hotData return conf; } - private static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, - HRegionFileSystem regionFs) throws IOException { + static HStoreFile createHStoreFile(Path storeDir, Configuration conf, long timestamp, + HRegionFileSystem regionFs) throws IOException { String columnFamily = storeDir.getName(); StoreFileWriter storeFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) @@ -815,11 +818,11 @@ private static HStoreFile createHStoreFile(Path storeDir, Configuration conf, lo writeStoreFileRandomData(storeFileWriter, Bytes.toBytes(columnFamily), timestamp); - StoreContext storeContext = - StoreContext.getBuilder().withRegionFileSystem(regionFs).build(); + StoreContext storeContext = StoreContext.getBuilder().withRegionFileSystem(regionFs).build(); StoreFileTracker sft = StoreFileTrackerFactory.create(conf, true, storeContext); - return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true, sft); + return new HStoreFile(fs, storeFileWriter.getPath(), conf, cacheConf, BloomType.NONE, true, + sft); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java new file mode 100644 index 000000000000..253bdb43567f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; +import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.CustomTieredStoreEngine; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestCustomCellTieredCompactor { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCustomCellTieredCompactor.class); + + public static final byte[] FAMILY = Bytes.toBytes("cf"); + + protected HBaseTestingUtil utility; + + protected Admin admin; + + @Before + public void setUp() throws Exception { + utility = new HBaseTestingUtil(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCustomCellTieredCompactor() throws Exception { + ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY); + clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName()); + clmBuilder.setValue(TIERING_CELL_QUALIFIER, "date"); + TableName tableName = TableName.valueOf("testCustomCellTieredCompactor"); + TableDescriptorBuilder tblBuilder = TableDescriptorBuilder.newBuilder(tableName); + tblBuilder.setColumnFamily(clmBuilder.build()); + utility.getAdmin().createTable(tblBuilder.build()); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + long recordTime = System.currentTimeMillis(); + // write data and flush multiple store files: + for (int i = 0; i < 6; i++) { + List puts = new ArrayList<>(2); + Put put = new Put(Bytes.toBytes(i)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i)); + put.addColumn(FAMILY, Bytes.toBytes("date"), + Bytes.toBytes(recordTime - (11L * 366L * 24L * 60L * 60L * 1000L))); + puts.add(put); + put = new Put(Bytes.toBytes(i + 1000)); + put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000))); + put.addColumn(FAMILY, Bytes.toBytes("date"), Bytes.toBytes(recordTime)); + puts.add(put); + table.put(puts); + utility.flush(tableName); + } + table.close(); + long firstCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName) + > firstCompactionTime); + long numHFiles = utility.getNumHFiles(tableName, FAMILY); + // The first major compaction would have no means to detect more than one tier, + // because without the min/max values available in the file info portion of the selected files + // for compaction, CustomCellDateTieredCompactionPolicy has no means + // to calculate the proper boundaries. + assertEquals(1, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles() + .forEach(file -> { + byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals((recordTime - (11L * 366L * 24L * 60L * 60L * 1000L)), + timeRangeTracker.getMin()); + assertEquals(recordTime, timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + // now do major compaction again, to make sure we write two separate files + long secondCompactionTime = System.currentTimeMillis(); + utility.getAdmin().majorCompact(tableName); + Waiter.waitFor(utility.getConfiguration(), 5000, + () -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(tableName) + > secondCompactionTime); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(2, numHFiles); + utility.getMiniHBaseCluster().getRegions(tableName).get(0).getStore(FAMILY).getStorefiles() + .forEach(file -> { + byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE); + assertNotNull(rangeBytes); + try { + TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes); + assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax()); + } catch (IOException e) { + fail(e.getMessage()); + } + }); + } +}