Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -207,6 +208,22 @@ default Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration con
return Optional.empty();
}

/**
* Checks whether the block represented by the given key should be cached or not. This method may
* not be overridden by all implementing classes. In such cases, the returned Optional will be
* empty. For subclasses implementing this logic, the returned Optional would contain the boolean
* value reflecting if the passed block should indeed be cached.
* @param key The key representing the block to check if it should be cached.
* @param timeRangeTracker the time range tracker containing the timestamps
* @param conf The configuration object to use for determining caching behavior.
* @return An empty Optional if this method is not supported; otherwise, the returned Optional
* contains the boolean value indicating if the block should be cached.
*/
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
Configuration conf) {
return Optional.empty();
}

/**
* Checks whether the block for the passed key is already cached. This method may not be
* overridden by all implementing classes. In such cases, the returned Optional will be empty. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,4 @@ public void setBlockType(BlockType blockType) {
public Path getFilePath() {
return filePath;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -484,11 +485,22 @@ public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {

@Override
public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
Optional<Boolean> l1Result = l1Cache.shouldCacheFile(hFileInfo, conf);
Optional<Boolean> l2Result = l2Cache.shouldCacheFile(hFileInfo, conf);
return combineCacheResults(l1Cache.shouldCacheFile(hFileInfo, conf),
l2Cache.shouldCacheFile(hFileInfo, conf));
}

@Override
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
Configuration conf) {
return combineCacheResults(l1Cache.shouldCacheBlock(key, timeRangeTracker, conf),
l2Cache.shouldCacheBlock(key, timeRangeTracker, conf));
}

private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,
Optional<Boolean> result2) {
final Mutable<Boolean> combinedResult = new MutableBoolean(true);
l1Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
l2Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
result1.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
result2.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
return Optional.of(combinedResult.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public interface Writer extends Closeable, CellSink, ShipperListener {
/** Add an element to the file info map. */
void appendFileInfo(byte[] key, byte[] value) throws IOException;

/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
void appendTrackedTimestampsToMetadata() throws IOException;

/** Returns the path to this {@link HFile} */
Path getPath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;

import java.io.DataOutput;
import java.io.DataOutputStream;
Expand All @@ -26,6 +28,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -45,6 +48,7 @@
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
Expand Down Expand Up @@ -117,6 +121,8 @@ public class HFileWriterImpl implements HFile.Writer {
/** May be null if we were passed a stream. */
protected final Path path;

protected final Configuration conf;

/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;

Expand Down Expand Up @@ -170,12 +176,16 @@ public class HFileWriterImpl implements HFile.Writer {

protected long maxMemstoreTS = 0;

private final TimeRangeTracker timeRangeTracker;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;

public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
FSDataOutputStream outputStream, HFileContext fileContext) {
this.outputStream = outputStream;
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.hFileContext = fileContext;
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
if (encoding != DataBlockEncoding.NONE) {
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
Expand All @@ -190,6 +200,7 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat
}
closeOutputStream = path != null;
this.cacheConf = cacheConf;
this.conf = conf;
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);

Expand Down Expand Up @@ -555,16 +566,31 @@ private void writeInlineBlocks(boolean closing) throws IOException {
private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
if (!shouldCacheBlock(cache, key)) {
return;
}
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
cacheFormatBlock, cacheConf.isInMemory(), true);
cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
}
});
}

private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
if (path != null) {
return new BlockCacheKey(path, offset, true, blockType);
}
return new BlockCacheKey(name, offset, true, blockType);
}

private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf);
return result.orElse(true);
}

/**
* Ready a new block for writing.
*/
Expand Down Expand Up @@ -767,6 +793,8 @@ public void append(final Cell cell) throws IOException {
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}

trackTimestamps(cell);
}

@Override
Expand Down Expand Up @@ -859,4 +887,25 @@ protected void finishClose(FixedFileTrailer trailer) throws IOException {
outputStream = null;
}
}

/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTrackedTimestampsToMetadata() throws IOException {
// TODO: The StoreFileReader always converts the byte[] to TimeRange
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}

/**
* Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
* to include the timestamp of this key
*/
private void trackTimestamps(final Cell cell) {
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
}
timeRangeTracker.includeTimestamp(cell);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.DataTieringManager;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -2203,6 +2204,18 @@ public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf
return Optional.of(!fullyCachedFiles.containsKey(fileName));
}

@Override
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, TimeRangeTracker timeRangeTracker,
Configuration conf) {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null && !dataTieringManager.isHotData(timeRangeTracker, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data",
key.getHfileName());
Copy link
Contributor

@jhungund jhungund May 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we need this "key" parameter only for logging the details? If so, we could move this debug to the caller function and avoid passing this parameter to this API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, it's only used for logging purposes. The caller function, shouldCacheBlock(), isn't aware of data tiering, so I couldn't add that line there. Hence, I kept it here. Additionally, it makes sense to pass the corresponding BlockCacheKey to shouldCacheBlock() to make the decision.
However, I'm okay with removing that parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK got it. I am ok for the parameter key to be there considering the API name is shouldCacheBlock().

return Optional.of(false);
}
return Optional.of(true);
}

@Override
public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
return Optional.of(getBackingMap().containsKey(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ public boolean isHotData(BlockCacheKey key) throws DataTieringException {
return isHotData(hFilePath);
}

/**
* Determines whether the data associated with the given time range tracker is considered hot. If
* the data tiering type is set to {@link DataTieringType#TIME_RANGE}, it uses the maximum
* timestamp from the time range tracker to determine if the data is hot. Otherwise, it considers
* the data as hot by default.
* @param timeRangeTracker the time range tracker containing the timestamps
* @param conf The configuration object to use for determining hot data criteria.
* @return {@code true} if the data is hot, {@code false} otherwise
*/
public boolean isHotData(TimeRangeTracker timeRangeTracker, Configuration conf) {
DataTieringType dataTieringType = getDataTieringType(conf);
if (
dataTieringType.equals(DataTieringType.TIME_RANGE)
&& timeRangeTracker.getMax() != TimeRangeTracker.INITIAL_MAX_TIMESTAMP
) {
return hotDataValidator(timeRangeTracker.getMax(), getDataTieringHotDataAge(conf));
}
// DataTieringType.NONE or other types are considered hot by default
return true;
}

/**
* Determines whether the data in the HFile at the given path is considered hot based on the
* configured data tiering type and hot data age. If the data tiering type is set to
Expand All @@ -151,6 +172,27 @@ public boolean isHotData(Path hFilePath) throws DataTieringException {
return true;
}

/**
* Determines whether the data in the HFile at the given path is considered hot based on the
* configured data tiering type and hot data age. If the data tiering type is set to
* {@link DataTieringType#TIME_RANGE}, it validates the data against the provided maximum
* timestamp.
* @param hFilePath the path to the HFile
* @param maxTimestamp the maximum timestamp to validate against
* @return {@code true} if the data is hot, {@code false} otherwise
* @throws DataTieringException if there is an error retrieving data tiering information
*/
public boolean isHotData(Path hFilePath, long maxTimestamp) throws DataTieringException {
Configuration configuration = getConfiguration(hFilePath);
DataTieringType dataTieringType = getDataTieringType(configuration);

if (dataTieringType.equals(DataTieringType.TIME_RANGE)) {
return hotDataValidator(maxTimestamp, getDataTieringHotDataAge(configuration));
}
// DataTieringType.NONE or other types are considered hot by default
return true;
}

/**
* Determines whether the data in the HFile being read is considered hot based on the configured
* data tiering type and hot data age. If the data tiering type is set to
Expand Down Expand Up @@ -231,10 +273,12 @@ public Set<String> getColdDataFiles(Set<BlockCacheKey> allCachedBlocks)
}

private HRegion getHRegion(Path hFilePath) throws DataTieringException {
if (hFilePath.getParent() == null || hFilePath.getParent().getParent() == null) {
throw new DataTieringException("Incorrect HFile Path: " + hFilePath);
String regionId;
try {
regionId = HRegionFileSystem.getRegionId(hFilePath);
} catch (IOException e) {
throw new DataTieringException(e.getMessage());
}
String regionId = hFilePath.getParent().getParent().getName();
HRegion hRegion = this.onlineRegions.get(regionId);
if (hRegion == null) {
throw new DataTieringException("HRegion corresponding to " + hFilePath + " doesn't exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,31 @@ public static void deleteRegionFromFileSystem(final Configuration conf, final Fi
}
}

/**
* Retrieves the Region ID from the given HFile path.
* @param hFilePath The path of the HFile.
* @return The Region ID extracted from the HFile path.
* @throws IOException If an I/O error occurs or if the HFile path is incorrect.
*/
public static String getRegionId(Path hFilePath) throws IOException {
if (hFilePath.getParent() == null || hFilePath.getParent().getParent() == null) {
throw new IOException("Incorrect HFile Path: " + hFilePath);
}
Path dir = hFilePath.getParent().getParent();
if (isTemporaryDirectoryName(dir.getName())) {
if (dir.getParent() == null) {
throw new IOException("Incorrect HFile Path: " + hFilePath);
}
return dir.getParent().getName();
}
return dir.getName();
}

private static boolean isTemporaryDirectoryName(String dirName) {
return REGION_MERGES_DIR.equals(dirName) || REGION_SPLITS_DIR.equals(dirName)
|| REGION_TEMP_DIR.equals(dirName);
}

/**
* Creates a directory. Assumes the user has already checked for this directory existence.
* @return the result of fs.mkdirs(). In case underlying fs throws an IOException, it checks
Expand Down
Loading