-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29427 Merge all commits related to custom tiering into the feature branch #7124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5f20c0f
edddf3e
2793766
2398e3f
9946e36
c967603
5a0a3a6
adb40a4
2833205
63a290a
7496a26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |
| import org.apache.hadoop.hbase.ipc.RpcServer; | ||
| import org.apache.hadoop.hbase.regionserver.CellSink; | ||
| import org.apache.hadoop.hbase.regionserver.ShipperListener; | ||
| import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; | ||
| import org.apache.hadoop.hbase.util.BloomFilterWriter; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hadoop.hbase.util.FSUtils; | ||
|
|
@@ -217,6 +218,12 @@ public interface Writer extends Closeable, CellSink, ShipperListener { | |
| */ | ||
| void appendTrackedTimestampsToMetadata() throws IOException; | ||
|
|
||
| /** | ||
| * Add Custom cell timestamp to Metadata | ||
| */ | ||
| public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: why do we need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need min and max to calculate compaction boundaries properly, as we can have files where the max TS is higher than the threshold, but the min is lower. In such cases, the tiered compaction should know it has to split the file. |
||
| throws IOException; | ||
|
|
||
| /** Returns the path to this {@link HFile} */ | ||
| Path getPath(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1380,7 +1380,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo | |
| HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked); | ||
| // Cache the block if necessary | ||
| cacheConf.getBlockCache().ifPresent(cache -> { | ||
| if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { | ||
| if (cacheBlock && cacheOnRead) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we change from cacheConf.shouldCacheBlockOnRead(category) to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are not caching only DATA blocks here. We are actually applying the DataTieringManager for DATA blocks only. If the block type isn't DATA, we just apply the |
||
| // Using the wait on cache during compaction and prefetching. | ||
| cache.cacheBlock(cacheKey, | ||
| cacheCompressed | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.hadoop.hbase.io.hfile; | ||
|
|
||
| import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.MAX_BLOCK_SIZE_UNCOMPRESSED; | ||
| import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE; | ||
| import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; | ||
| import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; | ||
|
|
||
|
|
@@ -29,6 +30,7 @@ | |
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.function.Supplier; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FSDataOutputStream; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
|
|
@@ -127,6 +129,12 @@ public class HFileWriterImpl implements HFile.Writer { | |
| /** Cache configuration for caching data on write. */ | ||
| protected final CacheConfig cacheConf; | ||
|
|
||
| public void setTimeRangeTrackerForTiering(Supplier<TimeRangeTracker> timeRangeTrackerForTiering) { | ||
| this.timeRangeTrackerForTiering = timeRangeTrackerForTiering; | ||
| } | ||
|
|
||
| private Supplier<TimeRangeTracker> timeRangeTrackerForTiering; | ||
|
|
||
| /** | ||
| * Name for this object used when logging or in toString. Is either the result of a toString on | ||
| * stream or else name of passed file Path. | ||
|
|
@@ -186,7 +194,9 @@ public HFileWriterImpl(final Configuration conf, CacheConfig cacheConf, Path pat | |
| this.path = path; | ||
| this.name = path != null ? path.getName() : outputStream.toString(); | ||
| this.hFileContext = fileContext; | ||
| // TODO: Move this back to upper layer | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit : should we delete this TODO ? or should we have new JIRA for it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, added this back then because I wasn't happy with adding time range tracking logic into HFileWriterImpl (originally, it was in StoreFileWriter). This was introduced here by HBASE-28469, IIRC, as a mean to track dates also when flushing, rather than at compaction only. I think we should keep the TODO and revisit this code later on a separate JIRA. We still have to work on how to track custom tiering value on flushes, maybe that would be a good time to solve this TODO as well. |
||
| this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC); | ||
| this.timeRangeTrackerForTiering = () -> this.timeRangeTracker; | ||
| DataBlockEncoding encoding = hFileContext.getDataBlockEncoding(); | ||
| if (encoding != DataBlockEncoding.NONE) { | ||
| this.blockEncoder = new HFileDataBlockEncoderImpl(encoding); | ||
|
|
@@ -588,7 +598,8 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) { | |
| } | ||
|
|
||
| private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) { | ||
| Optional<Boolean> result = cache.shouldCacheBlock(key, timeRangeTracker, conf); | ||
| Optional<Boolean> result = | ||
| cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf); | ||
| return result.orElse(true); | ||
| } | ||
|
|
||
|
|
@@ -899,12 +910,19 @@ public void appendTrackedTimestampsToMetadata() throws IOException { | |
| appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); | ||
| } | ||
|
|
||
| public void appendCustomCellTimestampsToMetadata(TimeRangeTracker timeRangeTracker) | ||
| throws IOException { | ||
| // TODO: The StoreFileReader always converts the byte[] to TimeRange | ||
| // via TimeRangeTracker, so we should write the serialization data of TimeRange directly. | ||
|
Comment on lines
+915
to
+916
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: new JIRA need?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole code (TODO comments included) came together from StoreFileWriter with the changes of HBASE-28469. |
||
| appendFileInfo(CUSTOM_TIERING_TIME_RANGE, TimeRangeTracker.toByteArray(timeRangeTracker)); | ||
| } | ||
|
|
||
| /** | ||
| * Record the earliest Put timestamp. If the timeRangeTracker is not set, update TimeRangeTracker | ||
| * to include the timestamp of this key | ||
| */ | ||
| private void trackTimestamps(final ExtendedCell cell) { | ||
| if (Cell.Type.Put == cell.getType()) { | ||
| if (KeyValue.Type.Put == KeyValue.Type.codeToType(cell.getTypeByte())) { | ||
| earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp()); | ||
| } | ||
| timeRangeTracker.includeTimestamp(cell); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hbase.regionserver; | ||
|
|
||
| import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.OptionalLong; | ||
| import org.apache.hadoop.hbase.io.hfile.HFileInfo; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| @InterfaceAudience.Private | ||
| public class CellTSTiering implements DataTiering { | ||
| private static final Logger LOG = LoggerFactory.getLogger(CellTSTiering.class); | ||
|
|
||
| public long getTimestamp(HStoreFile hStoreFile) { | ||
| OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp(); | ||
| if (!maxTimestamp.isPresent()) { | ||
| LOG.debug("Maximum timestamp not present for {}", hStoreFile.getPath()); | ||
| return Long.MAX_VALUE; | ||
| } | ||
| return maxTimestamp.getAsLong(); | ||
| } | ||
|
|
||
| public long getTimestamp(HFileInfo hFileInfo) { | ||
| try { | ||
| byte[] hFileTimeRange = hFileInfo.get(TIMERANGE_KEY); | ||
| if (hFileTimeRange == null) { | ||
| LOG.debug("Timestamp information not found for file: {}", | ||
| hFileInfo.getHFileContext().getHFileName()); | ||
| return Long.MAX_VALUE; | ||
| } | ||
| return TimeRangeTracker.parseFrom(hFileTimeRange).getMax(); | ||
| } catch (IOException e) { | ||
| LOG.error("Error occurred while reading the timestamp metadata of file: {}", | ||
| hFileInfo.getHFileContext().getHFileName(), e); | ||
| return Long.MAX_VALUE; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hbase.regionserver; | ||
|
|
||
| import static org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.CellComparator; | ||
| import org.apache.hadoop.hbase.CompoundConfiguration; | ||
| import org.apache.hadoop.hbase.regionserver.compactions.CustomDateTieredCompactionPolicy; | ||
| import org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor; | ||
| import org.apache.yetus.audience.InterfaceAudience; | ||
|
|
||
| /** | ||
| * Extension of {@link DateTieredStoreEngine} that uses a pluggable value provider for extracting | ||
| * the value to be used for comparison in this tiered compaction. Differently from the existing Date | ||
| * Tiered Compaction, this doesn't yield multiple tiers or files, but rather provides two tiers | ||
| * based on a configurable “cut-off” age. All rows with the cell tiering value older than this | ||
| * “cut-off” age would be placed together in an “old” tier, whilst younger rows would go to a | ||
| * separate, “young” tier file. | ||
| */ | ||
| @InterfaceAudience.Private | ||
| public class CustomTieredStoreEngine extends DateTieredStoreEngine { | ||
|
|
||
| @Override | ||
| protected void createComponents(Configuration conf, HStore store, CellComparator kvComparator) | ||
| throws IOException { | ||
| CompoundConfiguration config = new CompoundConfiguration(); | ||
| config.add(conf); | ||
| config.add(store.conf); | ||
| config.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, | ||
| CustomDateTieredCompactionPolicy.class.getName()); | ||
| createCompactionPolicy(config, store); | ||
| this.storeFileManager = new DefaultStoreFileManager(kvComparator, | ||
| StoreFileComparators.SEQ_ID_MAX_TIMESTAMP, config, compactionPolicy.getConf()); | ||
| this.storeFlusher = new DefaultStoreFlusher(config, store); | ||
| this.compactor = new CustomTieredCompactor(config, store); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this branch based on master? I didn't find this function on the master branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is based out of the HBASE-28463 branch, which was last rebased with master around 23/06/2025. The reason this method is not in master branch is because it was added on this HBASE-28463 branch by HBASE-28467.