Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,23 @@ String[] linkCount(File file) throws IOException {
* ****************************************************
*/

/**
* Creates a hardlink without creating parent folder.
* @param file - existing source file
* @param linkName - desired target link file
*/
public static void createHardLink(File file, File linkName)
throws IOException {
createHardLink(file, linkName, false);
}

/**
* Creates a hardlink
* @param file - existing source file
* @param linkName - desired target link file
* @param p - Whether we need to create parents if not exists.
*/
public static void createHardLink(File file, File linkName)
public static void createHardLink(File file, File linkName, boolean p)
throws IOException {
if (file == null) {
throw new IOException(
Expand All @@ -167,6 +178,11 @@ public static void createHardLink(File file, File linkName)
throw new IOException(
"invalid arguments to createHardLink: link name is null");
}
if (p && !linkName.getParentFile().exists()
&& !linkName.getParentFile().mkdirs()) {
throw new IOException(
"Failed to create parent folder for hardLink " + linkName);
}
createLink(linkName.toPath(), file.toPath());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public static StorageType parseStorageType(String s) {
return StorageType.valueOf(StringUtils.toUpperCase(s));
}

public static boolean allowSameDiskTiering(StorageType storageType) {
return storageType == StorageType.DISK
|| storageType == StorageType.ARCHIVE;
}

private static List<StorageType> getNonTransientTypes() {
List<StorageType> nonTransientTypes = new ArrayList<>();
for (StorageType t : VALUES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ public void testCreateHardLink() throws IOException {
assertTrue(fetchFileContents(x1_one).equals(str1 + str3));
assertTrue(fetchFileContents(x11_one).equals(str1 + str3));
assertTrue(fetchFileContents(x1).equals(str1 + str3));

// Validate create parent flag
File pathWithoutParentDir = new File(tgt_one, "newDir/dest");
createHardLink(x1, pathWithoutParentDir, true);
assertTrue(pathWithoutParentDir.exists());
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ public DirectoryScanner(FsDatasetSpi<?> dataset, Configuration conf) {
* Start the scanner. The scanner will run every
* {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
*/
void start() {
@VisibleForTesting
public void start() {
shouldRun.set(true);
long firstScanTime = ThreadLocalRandom.current().nextLong(scanPeriodMsecs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.HadoopIllegalArgumentException;
Expand Down Expand Up @@ -994,6 +995,20 @@ static File[] copyBlockFiles(long blockId, long genStamp,
smallBufferSize, conf);
}

/**
* Link the block and meta files for the given block to the given destination.
* @return the new meta and block files.
* @throws IOException
*/
static File[] hardLinkBlockFiles(long blockId, long genStamp,
ReplicaInfo srcReplica, File destRoot) throws IOException {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
// blockName is same as the filename for the block
final File dstFile = new File(destDir, srcReplica.getBlockName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
return hardLinkBlockFiles(srcReplica, dstMeta, dstFile);
}

static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile, boolean calculateChecksum,
int smallBufferSize, final Configuration conf)
Expand Down Expand Up @@ -1026,6 +1041,43 @@ static File[] copyBlockFiles(ReplicaInfo srcReplica, File dstMeta,
return new File[] {dstMeta, dstFile};
}

static File[] hardLinkBlockFiles(ReplicaInfo srcReplica, File dstMeta,
File dstFile)
throws IOException {
hardLinkBlockMetaFile(srcReplica, dstMeta);
hardLinkBlockFile(srcReplica, dstFile);
if (LOG.isDebugEnabled()) {
LOG.debug("Linked " + srcReplica.getBlockURI() + " to " + dstFile);
}
return new File[]{dstMeta, dstFile};
}

static File hardLinkBlockFile(ReplicaInfo srcReplica, File dstFile)
throws IOException {
try {
HardLink.createHardLink(
new File(srcReplica.getBlockURI()), dstFile, true);
} catch (IOException e) {
throw new IOException("Failed to hardLink "
+ srcReplica + " block file to "
+ dstFile, e);
}
return dstFile;
}

static File hardLinkBlockMetaFile(ReplicaInfo srcReplica, File dstMeta)
throws IOException {
try {
HardLink.createHardLink(
new File(srcReplica.getMetadataURI()), dstMeta, true);
} catch (IOException e) {
throw new IOException("Failed to hardLink "
+ srcReplica + " metadata to "
+ dstMeta, e);
}
return dstMeta;
}

/**
* Move block files from one storage to another storage.
* @return Returns the Old replicaInfo
Expand Down Expand Up @@ -1058,12 +1110,30 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
}

FsVolumeReference volumeRef = null;
boolean shouldConsiderSameMountVolume =
shouldConsiderSameMountVolume(replicaInfo.getVolume(),
targetStorageType, targetStorageId);
boolean useVolumeOnSameMount = false;

try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
if (shouldConsiderSameMountVolume) {
volumeRef = volumes.getVolumeByMount(targetStorageType,
((FsVolumeImpl) replicaInfo.getVolume()).getMount(),
block.getNumBytes());
if (volumeRef != null) {
useVolumeOnSameMount = true;
}
}
if (!useVolumeOnSameMount) {
volumeRef = volumes.getNextVolume(
targetStorageType,
targetStorageId,
block.getNumBytes()
);
}
}
try {
moveBlock(block, replicaInfo, volumeRef);
moveBlock(block, replicaInfo, volumeRef, useVolumeOnSameMount);
} finally {
if (volumeRef != null) {
volumeRef.close();
Expand All @@ -1074,6 +1144,30 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
return replicaInfo;
}

/**
* When configuring DISK/ARCHIVE on same volume,
* check if we should find the counterpart on the same disk mount.
*/
@VisibleForTesting
boolean shouldConsiderSameMountVolume(FsVolumeSpi fsVolume,
StorageType targetStorageType, String targetStorageID) {
if (targetStorageID != null && !targetStorageID.isEmpty()) {
return false;
}
if (!(fsVolume instanceof FsVolumeImpl)
|| ((FsVolumeImpl) fsVolume).getMount().isEmpty()) {
return false;
}
StorageType sourceStorageType = fsVolume.getStorageType();
// Source/dest storage types are different
if (sourceStorageType == targetStorageType) {
return false;
}
// Source/dest storage types are either DISK or ARCHIVE.
return StorageType.allowSameDiskTiering(sourceStorageType)
&& StorageType.allowSameDiskTiering(targetStorageType);
}

/**
* Moves a block from a given volume to another.
*
Expand All @@ -1086,8 +1180,33 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
@VisibleForTesting
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
ReplicaInfo newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);
return moveBlock(block, replicaInfo, volumeRef, false);
}

/**
* Moves a block from a given volume to another.
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @param moveBlockToLocalMount - Whether we use shortcut
* to move block on same mount.
* @return newReplicaInfo
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo,
FsVolumeReference volumeRef, boolean moveBlockToLocalMount)
throws IOException {
ReplicaInfo newReplicaInfo = null;
if (moveBlockToLocalMount) {
newReplicaInfo = moveReplicaToVolumeOnSameMount(block, replicaInfo,
volumeRef);
} else {
newReplicaInfo = copyReplicaToVolume(block, replicaInfo,
volumeRef);
}

finalizeNewReplica(newReplicaInfo, block);
removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId());
return newReplicaInfo;
Expand Down Expand Up @@ -1128,6 +1247,33 @@ ReplicaInfo copyReplicaToVolume(ExtendedBlock block, ReplicaInfo replicaInfo,
return newReplicaInfo;
}

/**
* Shortcut to use hardlink to move blocks on same mount.
* This is useful when moving blocks between storage types on same disk mount.
* Two cases need to be considered carefully:
* 1) Datanode restart in the middle should not cause data loss.
* We use hardlink to avoid this.
* 2) Finalized blocks can be reopened to append.
* This is already handled by dataset lock and gen stamp.
* See HDFS-12942
*
* @param block - Extended Block
* @param replicaInfo - ReplicaInfo
* @param volumeRef - Volume Ref - Closed by caller.
* @return newReplicaInfo new replica object created in specified volume.
* @throws IOException
*/
@VisibleForTesting
ReplicaInfo moveReplicaToVolumeOnSameMount(ExtendedBlock block,
ReplicaInfo replicaInfo,
FsVolumeReference volumeRef) throws IOException {
FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
// Move files to temp dir first
ReplicaInfo newReplicaInfo = targetVolume.hardLinkBlockToTmpLocation(block,
replicaInfo);
return newReplicaInfo;
}

/**
* Finalizes newReplica by calling finalizeReplica internally.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,8 @@ long getActualNonDfsUsed() throws IOException {
// should share the same amount of reserved capacity.
// When calculating actual non dfs used,
// exclude DFS used capacity by another volume.
if (enableSameDiskTiering &&
(storageType == StorageType.DISK
|| storageType == StorageType.ARCHIVE)) {
if (enableSameDiskTiering
&& StorageType.allowSameDiskTiering(storageType)) {
StorageType counterpartStorageType = storageType == StorageType.DISK
? StorageType.ARCHIVE : StorageType.DISK;
FsVolumeReference counterpartRef = dataset
Expand Down Expand Up @@ -1529,6 +1528,24 @@ public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
return newReplicaInfo;
}

public ReplicaInfo hardLinkBlockToTmpLocation(ExtendedBlock block,
ReplicaInfo replicaInfo) throws IOException {

File[] blockFiles = FsDatasetImpl.hardLinkBlockFiles(block.getBlockId(),
block.getGenerationStamp(), replicaInfo,
getTmpDir(block.getBlockPoolId()));

ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY)
.setBlockId(replicaInfo.getBlockId())
.setGenerationStamp(replicaInfo.getGenerationStamp())
.setFsVolume(this)
.setDirectoryToUse(blockFiles[0].getParentFile())
.setBytesToReserve(0)
.build();
newReplicaInfo.setNumBytes(blockFiles[1].length());
return newReplicaInfo;
}

public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
long genStamp,
ReplicaInfo replicaInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ private FsVolumeReference chooseVolume(List<FsVolumeImpl> list,
}
}

/**
* Get volume by disk mount to place a block.
* This is useful for same disk tiering.
*
* @param storageType The desired {@link StorageType}
* @param mount Disk mount of the volume
* @param blockSize Free space needed on the volume
* @return
* @throws IOException
*/
FsVolumeReference getVolumeByMount(StorageType storageType,
String mount, long blockSize) throws IOException {
if (!enableSameDiskTiering) {
return null;
}
FsVolumeReference volume = mountVolumeMap
.getVolumeRefByMountAndStorageType(mount, storageType);
// Check if volume has enough capacity
if (volume != null && volume.getVolume().getAvailable() > blockSize) {
return volume;
}
return null;
}

/**
* Get next volume.
*
Expand Down Expand Up @@ -354,9 +378,8 @@ private void removeVolume(FsVolumeImpl target) {
* Check if same disk tiering is applied to the volume.
*/
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
return enableSameDiskTiering &&
(target.getStorageType() == StorageType.DISK
|| target.getStorageType() == StorageType.ARCHIVE);
return enableSameDiskTiering
&& StorageType.allowSameDiskTiering(target.getStorageType());
}

/**
Expand Down
Loading