Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# Heterogeneous Storage for Date Tiered Compaction

## Objective

Support DateTiredCompaction([HBASE-15181](https://issues.apache.org/jira/browse/HBASE-15181))
for cold and hot data separation, support different storage policies for different time periods
of data to get better performance, for example, we can configure the data of last 1 month in SSD,
and 1 month ago data was in HDD.

+ Date Tiered Compaction (DTCP) is based on date tiering (date-aware), we hope to support
the separation of cold and hot data, heterogeneous storage. Set different storage
policies (in HDFS) for data in different time windows.
+ DTCP designs different windows, and we can classify the windows according to
the timestamps of the windows. For example: HOT window, WARM window, COLD window.
+ DTCP divides storefiles into different windows, and performs minor Compaction within
a time window. The storefile generated by Compaction will use the storage strategy of
this window. For example, if a window is a HOT window, the storefile generated by compaction
can be stored on the SSD. There are already WAL and the entire CF support storage policy
(HBASE-12848, HBASE-14061), our goal is to achieve cold and hot separation in one CF or
a region, using different storage policies.

## Definition of hot and cold data

Usually the data of the last 3 days can be defined as `HOT data`, hot age = 3 days.
If the written timestamp of the data(Cell) is > (timestamp now - hot age), we think the data is hot data.
Warm age can be defined in the same way. Only one type of data is allowed.
If data timestamp < (now - warm age), we consider it is COLD.
```
if timestamp >= (now - hot age) , HOT data
else if timestamp >= (now - warm age), WARM data
else COLD data
```

## Time window
When given a time now, it is the time when the compaction occurs. Each window and the size of
the window are automatically calculated by DTCP, and the window boundary is rounded according
to the base size.
Assuming that the base window size is 1 hour, and each tier has 3 windows, the current time is
between 12:00 and 13:00. We have defined three types of winow (`HOT, WARM, COLD`). The type of
winodw is determined by the timestamp at the beginning of the window and the timestamp now.
As shown in the figure 1 below, the type of each window can be determined by the age range
(hot / warm / cold) where (now - window.startTimestamp) falls. Cold age can not need to be set,
the default Long.MAX, meaning that the window with a very early time stamp belongs to the
cold window.
![figure 1](https://raw.githubusercontent.com/pengmq1/images/master/F1-HDTCP.png "figure 1")

## Example configuration

| Configuration Key | value | Note |
|:---|:---:|:---|
|hbase.hstore.compaction.date.tiered.storage.policy.enable|true|if or not use storage policy for window. Default is false|
|hbase.hstore.compaction.date.tiered.hot.window.age.millis|3600000|hot data age
|hbase.hstore.compaction.date.tiered.hot.window.storage.policy|ALL_SSD|hot data storage policy, Corresponding HDFS storage policy
|hbase.hstore.compaction.date.tiered.warm.window.age.millis|20600000||
|hbase.hstore.compaction.date.tiered.warm.window.storage.policy|ONE_SSD||
|hbase.hstore.compaction.date.tiered.cold.window.storage.policy|HOT||

The original date tiered compaction related configuration has the same meaning and maintains
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the CF config storage policy and enable this feature too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If hbase.hstore.compaction.date.tiered.storage.policy.enable is true, this will override CF config storage policy, and hbase.hstore.block.storage.policy does not work. Because storefile must belong to one window and will use window storage policy

compatibility.
If `hbase.hstore.compaction.date.tiered.storage.policy.enable = false`. DTCP still follows the
original logic and has not changed.

## Storage strategy
HDFS provides the following storage policies, you can refer to
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html

|Policy ID | Policy Name | Block Placement (3 replicas)|
|:---|:---|:---|
|15|Lasy_Persist|RAM_DISK: 1, DISK: 2|
|12|All_SSD|SSD: 3|
|10|One_SSD|SSD: 1, DISK: 2|
|7|Hot (default)|DISK: 3|
|5|Warm|DISK: 1, ARCHIVE: 2|
|2|Cold|ARCHIVE: 3|

Date Tiered Compaction (DTCP) supports the output of multiple storefiles. We hope that these
storefiles can be set with different storage policies (in HDFS).
Therefore, through DateTieredMultiFileWriter to generate different StoreFileWriters with
storage policy to achieve the purpose.

## Why use different child tmp dir
Before StoreFileWriter writes a storefile, we can create different dirs in the tmp directory
of the region and set the corresponding storage policy for these dirs. This way
StoreFileWriter can write files to different dirs.

Since **HDFS** does not support the create file with the storage policy parameter
(See https://issues.apache.org/jira/browse/HDFS-13209 and now not support on hadoop 2.x),
and HDFS cannot set a storage policy for a file / dir path that does not yet exist.
When the compaction ends, the storefile path must exist at this time, and I set the
storage policy to Storefile.

But, in HDFS, when the file is written first, and then the storage policy is set.
The actual storage location of the data does not match the storage policy. For example,
write three copies of a file (1 block) in the HDD, then set storage policy is ALL_SSD,
but the data block will not be moved to the SSD immediately.
“HDFS wont move the file content across different block volumes on rename”. Data movement
requires the HDFS mover tool, or use HDFS SPS
(for details, see https://issues.apache.org/jira/browse/HDFS-10285), so in order to
avoid moving data blocks at the HDFS level, we can set the file parent directory to
the storage policy we need before writing data. The new file automatically inherits the
storage policy of the parent directory, and is written according to the correct disk
type when writing. So as to avoid later data movement.

Over time, the original HOT data will become WARM / COLD and no longer belong to the
HOT window. When the compaction occurs again, the data will be automatically downgraded,
such as from SSD to HDD. The compaction mechanism will generate a new file (write into HDD)
and delete it Old file (SSD).
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public final class HConstants {
*/
public static final String RECOVERED_HFILES_DIR = "recovered.hfiles";

/**
* Date Tiered Compaction tmp dir prefix name if use storage policy
*/
public static final String STORAGE_POLICY_PREFIX = "storage_policy_";

/**
* The first four bytes of Hadoop RPC connections
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen

public interface WriterFactory {
public StoreFileWriter createWriter() throws IOException;
default StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createWriter();
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,33 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {

private final boolean needEmptyFile;

private final Map<Long, String> lowerBoundariesPolicies;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableMap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowerBoundariesPolicies is HashMap


/**
* @param lowerBoundariesPolicies each window to storage policy map.
* @param needEmptyFile whether need to create an empty store file if we haven't written out
* anything.
*/
public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
public DateTieredMultiFileWriter(List<Long> lowerBoundaries,
Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
this.lowerBoundariesPolicies = lowerBoundariesPolicies;
}

@Override
public void append(Cell cell) throws IOException {
Map.Entry<Long, StoreFileWriter> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFileWriter writer = entry.getValue();
if (writer == null) {
writer = writerFactory.createWriter();
String lowerBoundaryStoragePolicy = lowerBoundariesPolicies.get(entry.getKey());
if (lowerBoundaryStoragePolicy != null) {
writer = writerFactory.createWriterWithStoragePolicy(lowerBoundaryStoragePolicy);
} else {
writer = writerFactory.createWriter();
}
lowerBoundary2Writer.put(entry.getKey(), writer);
}
writer.append(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public void forceSelect(CompactionRequestImpl request) {
public List<Path> compact(ThroughputController throughputController, User user)
throws IOException {
if (request instanceof DateTieredCompactionRequest) {
return compactor.compact(request, ((DateTieredCompactionRequest) request).getBoundaries(),
DateTieredCompactionRequest compactionRequest = (DateTieredCompactionRequest) request;
return compactor.compact(request, compactionRequest.getBoundaries(),
compactionRequest.getBoundariesPolicies(),
throughputController, user);
} else {
throw new IllegalArgumentException("DateTieredCompactionRequest is expected. Actual: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand Down Expand Up @@ -1142,7 +1143,7 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind) throws IOException {
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
includesTag, shouldDropBehind, -1);
includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
}

/**
Expand All @@ -1156,7 +1157,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
// compaction
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
boolean shouldDropBehind, long totalCompactedFilesSize) throws IOException {
boolean shouldDropBehind, long totalCompactedFilesSize, String fileStoragePolicy)
throws IOException {
// creating new cache config for each new writer
final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
if (isCompaction) {
Expand Down Expand Up @@ -1214,7 +1216,8 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.withShouldDropCacheBehind(shouldDropBehind)
.withCompactedFilesSupplier(this::getCompactedFiles);
.withCompactedFilesSupplier(this::getCompactedFiles)
.withFileStoragePolicy(fileStoragePolicy);
return builder.build();
}

Expand Down Expand Up @@ -1535,6 +1538,7 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
List<Path> newFiles) throws IOException {
// Do the steps necessary to complete the compaction.
setStoragePolicyFromFileName(newFiles);
List<HStoreFile> sfs = moveCompactedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
Expand Down Expand Up @@ -1564,6 +1568,18 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
return sfs;
}

// Set correct storage policy from the file name of DTCP.
// Rename file will not change the storage policy.
private void setStoragePolicyFromFileName(List<Path> newFiles) throws IOException {
String prefix = HConstants.STORAGE_POLICY_PREFIX;
for (Path newFile : newFiles) {
if (newFile.getParent().getName().startsWith(prefix)) {
CommonFSUtils.setStoragePolicy(fs.getFileSystem(), newFile,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set storage policy for a file? This is not work before hdfs 3.3.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newFiles是已经compact结束将要被rename到region目录下的文件,但它们的storage policy还未指定,文件storage policy属性保存在INode中。rename之后,未指定storage policy的文件自动继承新的父目录。在rename之前调用setStoragePolicy()是为了使数据和storage policy保持一致,避免storage policy因为继承发生变化。HDFS允许对已经close的文件设置storage policy, 并不会发生数据移动。而且newFiles的数据和临时父目录的storage policy已经是一致的了(因为在compact之前tmp下已创建了不同存储策略的临时目录)

newFiles are files that have been compacted and will be renamed to the region directory, but their storage policy has not been specified. The file storage policy attribute is saved in INode. After renaming, files that do not specify a storage policy automatically inherit the new parent directory. Before renaming call to setStoragePolicy() here is to keep the data consistent with the storage policy and avoid the storage policy changing due to inheritance. HDFS allows storage policy to be set on files that have been closed, and no data movement will occur. Moreover, the data of newFiles and the storage policy of tmp parent dir are already consistent (because tmp directories of different storage strategies have been created under tmp dir before compact).

newFile.getParent().getName().substring(prefix.length()));
}
}
}

private List<HStoreFile> moveCompactedFilesIntoPlace(CompactionRequestImpl cr,
List<Path> newFiles, User user) throws IOException {
List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -437,6 +438,7 @@ public static class Builder {
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
private String fileStoragePolicy;

public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
Expand Down Expand Up @@ -518,6 +520,11 @@ public Builder withCompactedFilesSupplier(
return this;
}

public Builder withFileStoragePolicy(String fileStoragePolicy) {
this.fileStoragePolicy = fileStoragePolicy;
return this;
}

/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
Expand Down Expand Up @@ -547,6 +554,20 @@ public StoreFileWriter build() throws IOException {
CommonFSUtils.setStoragePolicy(this.fs, dir, policyName);

if (filePath == null) {
// The stored file and related blocks will used the directory based StoragePolicy.
// Because HDFS DistributedFileSystem does not support create files with storage policy
// before version 3.3.0 (See HDFS-13209). Use child dir here is to make stored files
// satisfy the specific storage policy when writing. So as to avoid later data movement.
// We don't want to change whole temp dir to 'fileStoragePolicy'.
if (!Strings.isNullOrEmpty(fileStoragePolicy)) {
dir = new Path(dir, HConstants.STORAGE_POLICY_PREFIX + fileStoragePolicy);
if (!fs.exists(dir)) {
HRegionFileSystem.mkdirs(fs, conf, dir);
LOG.info(
"Create tmp dir " + dir.toString() + " with storage policy: " + fileStoragePolicy);
}
CommonFSUtils.setStoragePolicy(this.fs, dir, fileStoragePolicy);
}
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ protected void initMultiWriter(AbstractMultiFileWriter writer, InternalScanner s
public StoreFileWriter createWriter() throws IOException {
return createTmpWriter(fd, shouldDropBehind);
}

@Override
public StoreFileWriter createWriterWithStoragePolicy(String fileStoragePolicy)
throws IOException {
return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ public class CompactionConfiguration {
private static final Class<? extends CompactionWindowFactory>
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS = ExponentialCompactionWindowFactory.class;

public static final String DATE_TIERED_STORAGE_POLICY_ENABLE_KEY =
"hbase.hstore.compaction.date.tiered.storage.policy.enable";
public static final String DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.age.millis";
public static final String DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.hot.window.storage.policy";
public static final String DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.age.millis";
public static final String DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.warm.window.storage.policy";
/** Windows older than warm age belong to COLD_WINDOW **/
public static final String DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY =
"hbase.hstore.compaction.date.tiered.cold.window.storage.policy";

Configuration conf;
StoreConfigInformation storeConfigInfo;

Expand All @@ -113,6 +127,12 @@ public class CompactionConfiguration {
private final String compactionPolicyForDateTieredWindow;
private final boolean dateTieredSingleOutputForMinorCompaction;
private final String dateTieredCompactionWindowFactory;
private final boolean dateTieredStoragePolicyEnable;
private long hotWindowAgeMillis;
private long warmWindowAgeMillis;
private String hotWindowStoragePolicy;
private String warmWindowStoragePolicy;
private String coldWindowStoragePolicy;

CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
Expand Down Expand Up @@ -147,6 +167,13 @@ public class CompactionConfiguration {
this.dateTieredCompactionWindowFactory = conf.get(
DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS_KEY,
DEFAULT_DATE_TIERED_COMPACTION_WINDOW_FACTORY_CLASS.getName());
// for Heterogeneous Storage
dateTieredStoragePolicyEnable = conf.getBoolean(DATE_TIERED_STORAGE_POLICY_ENABLE_KEY, false);
hotWindowAgeMillis = conf.getLong(DATE_TIERED_HOT_WINDOW_AGE_MILLIS_KEY, 86400000L);
hotWindowStoragePolicy = conf.get(DATE_TIERED_HOT_WINDOW_STORAGE_POLICY_KEY, "ALL_SSD");
warmWindowAgeMillis = conf.getLong(DATE_TIERED_WARM_WINDOW_AGE_MILLIS_KEY, 604800000L);
warmWindowStoragePolicy = conf.get(DATE_TIERED_WARM_WINDOW_STORAGE_POLICY_KEY, "ONE_SSD");
coldWindowStoragePolicy = conf.get(DATE_TIERED_COLD_WINDOW_STORAGE_POLICY_KEY, "HOT");
LOG.info(toString());
}

Expand Down Expand Up @@ -293,4 +320,28 @@ public boolean useDateTieredSingleOutputForMinorCompaction() {
public String getDateTieredCompactionWindowFactory() {
return dateTieredCompactionWindowFactory;
}

public boolean isDateTieredStoragePolicyEnable() {
return dateTieredStoragePolicyEnable;
}

public long getHotWindowAgeMillis() {
return hotWindowAgeMillis;
}

public long getWarmWindowAgeMillis() {
return warmWindowAgeMillis;
}

public String getHotWindowStoragePolicy() {
return hotWindowStoragePolicy.trim().toUpperCase();
}

public String getWarmWindowStoragePolicy() {
return warmWindowStoragePolicy.trim().toUpperCase();
}

public String getColdWindowStoragePolicy() {
return coldWindowStoragePolicy.trim().toUpperCase();
}
}
Loading