Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public final class TagType {
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
public static final byte TTL_TAG_TYPE = (byte) 8;
// tag with the custom cell tiering value for the row
public static final byte CELL_VALUE_TIERING_TAG_TYPE = (byte) 9;
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,28 @@ default Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
* overridden by all implementing classes. In such cases, the returned Optional will be empty. For
* subclasses implementing this logic, the returned Optional would contain the boolean value
* reflecting if the passed file should indeed be cached.
* @param fileName to check if it should be cached.
* @param hFileInfo Information about the file to check if it should be cached.
* @param conf The configuration object to use for determining caching behavior.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains the boolean value informing if the file should be cached.
*/
default Optional<Boolean> shouldCacheFile(String fileName) {
default Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
return Optional.empty();
}

/**
* Checks whether the block represented by the given key should be cached or not. This method may
* not be overridden by all implementing classes. In such cases, the returned Optional will be
* empty. For subclasses implementing this logic, the returned Optional would contain the boolean
* value reflecting if the passed block should indeed be cached.
* @param key The key representing the block to check if it should be cached.
* @param maxTimeStamp The maximum timestamp for the block to check if it should be cached.
* @param conf The configuration object to use for determining caching behavior.
* @return An empty Optional if this method is not supported; otherwise, the returned Optional
* contains the boolean value indicating if the block should be cached.
*/
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
Configuration conf) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,4 @@ public void setBlockType(BlockType blockType) {
public Path getFilePath() {
return filePath;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,19 @@ public boolean shouldCacheBlockOnRead(BlockCategory category) {
|| (prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN));
}

public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInfo,
Configuration conf) {
Optional<Boolean> cacheFileBlock = Optional.of(true);
// For DATA blocks only, if BuckeCache is in use, we don't need to cache block again
if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf);
if (result.isPresent()) {
cacheFileBlock = result;
}
}
return shouldCacheBlockOnRead(category) && cacheFileBlock.get();
}

/** Returns true if blocks in this file should be flagged as in-memory */
public boolean isInMemory() {
return this.inMemory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,23 @@ public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
}

@Override
public Optional<Boolean> shouldCacheFile(String fileName) {
Optional<Boolean> l1Result = l1Cache.shouldCacheFile(fileName);
Optional<Boolean> l2Result = l2Cache.shouldCacheFile(fileName);
public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
return combineCacheResults(l1Cache.shouldCacheFile(hFileInfo, conf),
l2Cache.shouldCacheFile(hFileInfo, conf));
}

@Override
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimeStamp,
Configuration conf) {
return combineCacheResults(l1Cache.shouldCacheBlock(key, maxTimeStamp, conf),
l2Cache.shouldCacheBlock(key, maxTimeStamp, conf));
}

private Optional<Boolean> combineCacheResults(Optional<Boolean> result1,
Optional<Boolean> result2) {
final Mutable<Boolean> combinedResult = new MutableBoolean(true);
l1Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
l2Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
result1.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
result2.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
return Optional.of(combinedResult.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -212,6 +213,17 @@ public interface Writer extends Closeable, CellSink, ShipperListener {
/** Add an element to the file info map. */
void appendFileInfo(byte[] key, byte[] value) throws IOException;

/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
void appendTrackedTimestampsToMetadata() throws IOException;

/**
* Add Custom cell timestamp to Metadata
*/
public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
throws IOException;

/** Returns the path to this {@link HFile} */
Path getPath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class HFileInfo implements SortedMap<byte[], byte[]> {

private FixedFileTrailer trailer;
private HFileContext hfileContext;
private boolean initialized = false;

public HFileInfo() {
super();
Expand Down Expand Up @@ -364,6 +365,10 @@ public void initTrailerAndContext(ReaderContext context, Configuration conf) thr
* should be called after initTrailerAndContext
*/
public void initMetaAndIndex(HFile.Reader reader) throws IOException {
if (initialized) {
return;
}

ReaderContext context = reader.getContext();
try {
HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
Expand Down Expand Up @@ -401,6 +406,7 @@ public void initMetaAndIndex(HFile.Reader reader) throws IOException {
throw new CorruptHFileException(
"Problem reading data index and meta index from file " + context.getFilePath(), t);
}
initialized = true;
}

private HFileContext createHFileContext(Path path, FixedFileTrailer trailer, Configuration conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class HFilePreadReader extends HFileReaderImpl {
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
// Initialize HFileInfo object with metadata for caching decisions
fileInfo.initMetaAndIndex(this);
// master hosted regions, like the master procedures store wouldn't have a block cache
// Prefetch file blocks upon open if requested
if (cacheConf.getBlockCache().isPresent() && cacheConf.shouldPrefetchOnOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,8 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws
BlockCacheKey cacheKey =
new BlockCacheKey(name, metaBlockOffset, this.isPrimaryReplicaReader(), BlockType.META);

cacheBlock &= cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
cacheBlock &=
cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory(), getHFileInfo(), conf);
HFileBlock cachedBlock =
getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META, null);
if (cachedBlock != null) {
Expand Down Expand Up @@ -1378,15 +1379,16 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
}
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
final boolean cacheCompressed = cacheConf.shouldCacheCompressed(category);
final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
final boolean cacheOnRead =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is different from #5905 , where cacheOnRead has been defined but it does not use in line#1391 and line#1405, is it expected?

in other words, is these two APIs of cacheConf.shouldCacheBlockOnRead(category) and cacheConf.shouldCacheBlockOnRead(category, getHFileInfo(), conf) the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is different from #5905 , where cacheOnRead has been defined but it does not use in line#1391 and line#1405, is it expected?

Sorry, ain't following you. This seems same code we added in PR 5905, line #1351 of HFileReaderImpl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other words, is these two APIs of cacheConf.shouldCacheBlockOnRead(category) and cacheConf.shouldCacheBlockOnRead(category, getHFileInfo(), conf) the same?

Those are not the same. The former just checks for the block type, whilst the latter performs additional checks to figure if the data block is already cached, in order to avoid redundant caching from file system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sorry I didn't explain my comment with links,

about the cacheOnRead , in the old PR #5905 , cacheOnRead has been used within the if block if (cacheOnly && cacheCompressed && cacheOnRead) { https://github.com/apache/hbase/pull/5905/files#diff-c910a49e7d962e49b199d22f38e1cd78d867351208b86bffdd56ffb9aa1aa596R1358 , but in this PR, we don't see those usage of cacheOnRead, did I miss anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I got it now. This is probably due to the rebase on top of master branch. Master branch had this HBASE-28596, which was missing on the feature branch when PR #5905 was merged. So when resolving this conflict in the rebase, I simply picked the master version.

Reading this now, seems wrong to me, we should do as PR #595 was doing. I'm going to update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for fixing it. LGTM now

cacheConf.shouldCacheBlockOnRead(category, getHFileInfo(), conf);

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
HFileBlock blockNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock);
cacheConf.getBlockCache().ifPresent(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheBlock && cacheOnRead) {
cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly);
}
});
Expand All @@ -1400,7 +1402,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheBlock && cacheOnRead) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey,
cacheCompressed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED;
import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;

import java.io.DataOutput;
import java.io.DataOutputStream;
Expand All @@ -26,6 +29,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -46,6 +51,7 @@
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
Expand Down Expand Up @@ -118,9 +124,17 @@ public class HFileWriterImpl implements HFile.Writer {
/** May be null if we were passed a stream. */
protected final Path path;

protected final Configuration conf;

/** Cache configuration for caching data on write. */
protected final CacheConfig cacheConf;

public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) {
this.timeRangeTrackerForTiering = timeRangeTrackerForTiering;
}

private Supplier<TimeRangeTracker> timeRangeTrackerForTiering;

/**
* Name for this object used when logging or in toString. Is either the result of a toString on
* stream or else name of passed file Path.
Expand Down Expand Up @@ -171,12 +185,18 @@ public class HFileWriterImpl implements HFile.Writer {

protected long maxMemstoreTS = 0;

private final TimeRangeTracker timeRangeTracker;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;

public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path path,
FSDataOutputStream outputStream, HFileContext fileContext) {
this.outputStream = outputStream;
this.path = path;
this.name = path != null ? path.getName() : outputStream.toString();
this.hFileContext = fileContext;
// TODO: Move this back to upper layer
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
this.timeRangeTrackerForTiering = () -> this.timeRangeTracker;
DataBlockEncoding encoding = hFileContext.getDataBlockEncoding();
if (encoding != DataBlockEncoding.NONE) {
this.blockEncoder = new HFileDataBlockEncoderImpl(encoding);
Expand All @@ -191,6 +211,7 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat
}
closeOutputStream = path != null;
this.cacheConf = cacheConf;
this.conf = conf;
float encodeBlockSizeRatio = conf.getFloat(UNIFIED_ENCODED_BLOCKSIZE_RATIO, 0f);
this.encodedBlockSizeLimit = (int) (hFileContext.getBlocksize() * encodeBlockSizeRatio);

Expand Down Expand Up @@ -557,6 +578,9 @@ private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
if (!shouldCacheBlock(cache, key)) {
return;
}
try {
cache.cacheBlock(key, cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
Expand All @@ -573,6 +597,12 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
return new BlockCacheKey(name, offset, true, blockType);
}

private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
Optional<Boolean> result =
cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
return result.orElse(true);
}

/**
* Ready a new block for writing.
*/
Expand Down Expand Up @@ -775,6 +805,8 @@ public void append(final ExtendedCell cell) throws IOException {
if (tagsLength > this.maxTagsLength) {
this.maxTagsLength = tagsLength;
}

trackTimestamps(cell);
}

@Override
Expand Down Expand Up @@ -867,4 +899,32 @@ protected void finishClose(FixedFileTrailer trailer) throws IOException {
outputStream = null;
}
}

/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTrackedTimestampsToMetadata() throws IOException {
// TODO: The StoreFileReader always converts the byte[] to TimeRange
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}

public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker)
throws IOException {
// TODO: The StoreFileReader always converts the byte[] to TimeRange
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker));
}

/**
* Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker
* to include the timestamp of this key
*/
private void trackTimestamps(final ExtendedCell cell) {
if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) {
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
}
timeRangeTracker.includeTimestamp(cell);
}
}
Loading