Skip to content

Commit 198bc44

Browse files
tasanumaRuinanGu
andauthored
HDFS-16566 Erasure Coding: Recovery may causes excess replicas when busy DN exsits (#4252) (#5059)
(cherry picked from commit 9376b65) Co-authored-by: RuinanGu <57645247+RuinanGu@users.noreply.github.com> Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org> Reviewed-by: Ashutosh Gupta <ashugpt@amazon.com>
1 parent 19f8e4f commit 198bc44

14 files changed

Lines changed: 167 additions & 22 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ message BlockECReconstructionInfoProto {
108108
required StorageTypesProto targetStorageTypes = 5;
109109
required bytes liveBlockIndices = 6;
110110
required ErasureCodingPolicyProto ecPolicy = 7;
111+
optional bytes excludeReconstructedIndices = 8;
111112
}
112113

113114
/**

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,11 +1049,17 @@ public static BlockECReconstructionInfo convertBlockECReconstructionInfo(
10491049

10501050
byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices()
10511051
.toByteArray();
1052+
byte[] excludeReconstructedIndices =
1053+
blockEcReconstructionInfoProto.hasExcludeReconstructedIndices() ?
1054+
blockEcReconstructionInfoProto.getExcludeReconstructedIndices()
1055+
.toByteArray() : new byte[0];
10521056
ErasureCodingPolicy ecPolicy =
10531057
PBHelperClient.convertErasureCodingPolicy(
10541058
blockEcReconstructionInfoProto.getEcPolicy());
1055-
return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos,
1056-
targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
1059+
return new BlockECReconstructionInfo(
1060+
block, sourceDnInfos, targetDnInfos,
1061+
targetStorageUuids, convertStorageTypes, liveBlkIndices,
1062+
excludeReconstructedIndices, ecPolicy);
10571063
}
10581064

10591065
public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo(
@@ -1079,6 +1085,10 @@ public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo(
10791085
byte[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
10801086
builder.setLiveBlockIndices(PBHelperClient.getByteString(liveBlockIndices));
10811087

1088+
byte[] excludeReconstructedIndices = blockEcRecoveryInfo.getExcludeReconstructedIndices();
1089+
builder.setExcludeReconstructedIndices(
1090+
PBHelperClient.getByteString(excludeReconstructedIndices));
1091+
10821092
builder.setEcPolicy(PBHelperClient.convertErasureCodingPolicy(
10831093
blockEcRecoveryInfo.getErasureCodingPolicy()));
10841094

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
906906
// source node returned is not used
907907
chooseSourceDatanodes(blockInfo, containingNodes,
908908
containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
909-
new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
909+
new ArrayList<Byte>(), new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
910910

911911
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
912912
// not included in the numReplicas.liveReplicas() count
@@ -2112,9 +2112,10 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
21122112
NumberReplicas numReplicas = new NumberReplicas();
21132113
List<Byte> liveBlockIndices = new ArrayList<>();
21142114
List<Byte> liveBusyBlockIndices = new ArrayList<>();
2115+
List<Byte> excludeReconstructed = new ArrayList<>();
21152116
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
21162117
containingNodes, liveReplicaNodes, numReplicas,
2117-
liveBlockIndices, liveBusyBlockIndices, priority);
2118+
liveBlockIndices, liveBusyBlockIndices, excludeReconstructed, priority);
21182119
short requiredRedundancy = getExpectedLiveRedundancyNum(block,
21192120
numReplicas);
21202121
if(srcNodes == null || srcNodes.length == 0) {
@@ -2182,9 +2183,13 @@ BlockReconstructionWork scheduleReconstruction(BlockInfo block,
21822183
for (int i = 0; i < liveBusyBlockIndices.size(); i++) {
21832184
busyIndices[i] = liveBusyBlockIndices.get(i);
21842185
}
2186+
byte[] excludeReconstructedIndices = new byte[excludeReconstructed.size()];
2187+
for (int i = 0; i < excludeReconstructed.size(); i++) {
2188+
excludeReconstructedIndices[i] = excludeReconstructed.get(i);
2189+
}
21852190
return new ErasureCodingWork(getBlockPoolId(), block, bc, newSrcNodes,
21862191
containingNodes, liveReplicaNodes, additionalReplRequired,
2187-
priority, newIndices, busyIndices);
2192+
priority, newIndices, busyIndices, excludeReconstructedIndices);
21882193
} else {
21892194
return new ReplicationWork(block, bc, srcNodes,
21902195
containingNodes, liveReplicaNodes, additionalReplRequired,
@@ -2426,7 +2431,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
24262431
List<DatanodeDescriptor> containingNodes,
24272432
List<DatanodeStorageInfo> nodesContainingLiveReplicas,
24282433
NumberReplicas numReplicas, List<Byte> liveBlockIndices,
2429-
List<Byte> liveBusyBlockIndices, int priority) {
2434+
List<Byte> liveBusyBlockIndices, List<Byte> excludeReconstructed, int priority) {
24302435
containingNodes.clear();
24312436
nodesContainingLiveReplicas.clear();
24322437
List<DatanodeDescriptor> srcNodes = new ArrayList<>();
@@ -2496,6 +2501,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
24962501
if (isStriped && (state == StoredReplicaState.LIVE
24972502
|| state == StoredReplicaState.DECOMMISSIONING)) {
24982503
liveBusyBlockIndices.add(blockIndex);
2504+
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
2505+
excludeReconstructed.add(blockIndex);
24992506
}
25002507
continue; // already reached replication limit
25012508
}
@@ -2504,6 +2511,8 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
25042511
if (isStriped && (state == StoredReplicaState.LIVE
25052512
|| state == StoredReplicaState.DECOMMISSIONING)) {
25062513
liveBusyBlockIndices.add(blockIndex);
2514+
//HDFS-16566 ExcludeReconstructed won't be reconstructed.
2515+
excludeReconstructed.add(blockIndex);
25072516
}
25082517
continue;
25092518
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,10 +672,10 @@ public void addBlockToBeReplicated(Block block,
672672
*/
673673
void addBlockToBeErasureCoded(ExtendedBlock block,
674674
DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
675-
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
675+
byte[] liveBlockIndices, byte[] excludeReconstrutedIndices, ErasureCodingPolicy ecPolicy) {
676676
assert (block != null && sources != null && sources.length > 0);
677677
BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
678-
sources, targets, liveBlockIndices, ecPolicy);
678+
sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy);
679679
erasurecodeBlocks.offer(task);
680680
BlockManager.LOG.debug("Adding block reconstruction task " + task + "to "
681681
+ getName() + ", current queue size is " + erasurecodeBlocks.size());

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
class ErasureCodingWork extends BlockReconstructionWork {
3333
private final byte[] liveBlockIndices;
3434
private final byte[] liveBusyBlockIndices;
35+
private final byte[] excludeReconstructedIndices;
3536
private final String blockPoolId;
3637

3738
public ErasureCodingWork(String blockPoolId, BlockInfo block,
@@ -40,12 +41,14 @@ public ErasureCodingWork(String blockPoolId, BlockInfo block,
4041
List<DatanodeDescriptor> containingNodes,
4142
List<DatanodeStorageInfo> liveReplicaStorages,
4243
int additionalReplRequired, int priority,
43-
byte[] liveBlockIndices, byte[] liveBusyBlockIndices) {
44+
byte[] liveBlockIndices, byte[] liveBusyBlockIndices,
45+
byte[] excludeReconstrutedIndices) {
4446
super(block, bc, srcNodes, containingNodes,
4547
liveReplicaStorages, additionalReplRequired, priority);
4648
this.blockPoolId = blockPoolId;
4749
this.liveBlockIndices = liveBlockIndices;
4850
this.liveBusyBlockIndices = liveBusyBlockIndices;
51+
this.excludeReconstructedIndices=excludeReconstrutedIndices;
4952
LOG.debug("Creating an ErasureCodingWork to {} reconstruct ",
5053
block);
5154
}
@@ -147,7 +150,7 @@ void addTaskToDatanode(NumberReplicas numberReplicas) {
147150
} else {
148151
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
149152
new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
150-
getLiveBlockIndices(), stripedBlk.getErasureCodingPolicy());
153+
liveBlockIndices, excludeReconstructedIndices, stripedBlk.getErasureCodingPolicy());
151154
}
152155
}
153156

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void processErasureCodingTasks(
127127
reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(),
128128
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
129129
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
130-
reconInfo.getTargetStorageIDs());
130+
reconInfo.getTargetStorageIDs(), reconInfo.getExcludeReconstructedIndices());
131131
// It may throw IllegalArgumentException from task#stripedReader
132132
// constructor.
133133
final StripedBlockReconstructor task =

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructionInfo.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,28 @@ public class StripedReconstructionInfo {
4141
private final DatanodeInfo[] targets;
4242
private final StorageType[] targetStorageTypes;
4343
private final String[] targetStorageIds;
44+
private final byte[] excludeReconstructedIndices;
4445

4546
public StripedReconstructionInfo(ExtendedBlock blockGroup,
4647
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
4748
byte[] targetIndices) {
4849
this(blockGroup, ecPolicy, liveIndices, sources, targetIndices, null,
49-
null, null);
50+
null, null, new byte[0]);
5051
}
5152

5253
StripedReconstructionInfo(ExtendedBlock blockGroup,
5354
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
5455
DatanodeInfo[] targets, StorageType[] targetStorageTypes,
55-
String[] targetStorageIds) {
56+
String[] targetStorageIds, byte[] excludeReconstructedIndices) {
5657
this(blockGroup, ecPolicy, liveIndices, sources, null, targets,
57-
targetStorageTypes, targetStorageIds);
58+
targetStorageTypes, targetStorageIds, excludeReconstructedIndices);
5859
}
5960

6061
private StripedReconstructionInfo(ExtendedBlock blockGroup,
6162
ErasureCodingPolicy ecPolicy, byte[] liveIndices, DatanodeInfo[] sources,
6263
byte[] targetIndices, DatanodeInfo[] targets,
63-
StorageType[] targetStorageTypes, String[] targetStorageIds) {
64+
StorageType[] targetStorageTypes, String[] targetStorageIds,
65+
byte[] excludeReconstructedIndices) {
6466

6567
this.blockGroup = blockGroup;
6668
this.ecPolicy = ecPolicy;
@@ -70,6 +72,7 @@ private StripedReconstructionInfo(ExtendedBlock blockGroup,
7072
this.targets = targets;
7173
this.targetStorageTypes = targetStorageTypes;
7274
this.targetStorageIds = targetStorageIds;
75+
this.excludeReconstructedIndices = excludeReconstructedIndices;
7376
}
7477

7578
ExtendedBlock getBlockGroup() {
@@ -104,5 +107,9 @@ String[] getTargetStorageIds() {
104107
return targetStorageIds;
105108
}
106109

110+
byte[] getExcludeReconstructedIndices() {
111+
return excludeReconstructedIndices;
112+
}
113+
107114
}
108115

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ abstract class StripedReconstructor {
120120
private final CachingStrategy cachingStrategy;
121121
private long maxTargetLength = 0L;
122122
private final BitSet liveBitSet;
123+
private final BitSet excludeBitSet;
123124

124125
// metrics
125126
private AtomicLong bytesRead = new AtomicLong(0);
@@ -137,6 +138,12 @@ abstract class StripedReconstructor {
137138
for (int i = 0; i < stripedReconInfo.getLiveIndices().length; i++) {
138139
liveBitSet.set(stripedReconInfo.getLiveIndices()[i]);
139140
}
141+
excludeBitSet = new BitSet(
142+
ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
143+
for (int i = 0; i < stripedReconInfo.getExcludeReconstructedIndices().length; i++) {
144+
excludeBitSet.set(stripedReconInfo.getExcludeReconstructedIndices()[i]);
145+
}
146+
140147
blockGroup = stripedReconInfo.getBlockGroup();
141148
stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
142149
cachingStrategy = CachingStrategy.newDefaultStrategy();
@@ -261,6 +268,10 @@ BitSet getLiveBitSet() {
261268
return liveBitSet;
262269
}
263270

271+
BitSet getExcludeBitSet(){
272+
return excludeBitSet;
273+
}
274+
264275
long getMaxTargetLength() {
265276
return maxTargetLength;
266277
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,14 @@ void init() throws IOException {
125125

126126
private void initTargetIndices() {
127127
BitSet bitset = reconstructor.getLiveBitSet();
128+
BitSet excludebitset=reconstructor.getExcludeBitSet();
128129

129130
int m = 0;
130131
hasValidTargets = false;
131132
for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
132133
if (!bitset.get(i)) {
133134
if (reconstructor.getBlockLen(i) > 0) {
134-
if (m < targets.length) {
135+
if (m < targets.length && !excludebitset.get(i)) {
135136
targetIndices[m++] = (short)i;
136137
hasValidTargets = true;
137138
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,28 +78,31 @@ public static class BlockECReconstructionInfo {
7878
private String[] targetStorageIDs;
7979
private StorageType[] targetStorageTypes;
8080
private final byte[] liveBlockIndices;
81+
private final byte[] excludeReconstructedIndices;
8182
private final ErasureCodingPolicy ecPolicy;
8283

8384
public BlockECReconstructionInfo(ExtendedBlock block,
8485
DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo,
85-
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
86+
byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) {
8687
this(block, sources, DatanodeStorageInfo
8788
.toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
8889
.toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo
89-
.toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy);
90+
.toStorageTypes(targetDnStorageInfo), liveBlockIndices,
91+
excludeReconstructedIndices, ecPolicy);
9092
}
9193

9294
public BlockECReconstructionInfo(ExtendedBlock block,
9395
DatanodeInfo[] sources, DatanodeInfo[] targets,
9496
String[] targetStorageIDs, StorageType[] targetStorageTypes,
95-
byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) {
97+
byte[] liveBlockIndices, byte[] excludeReconstructedIndices, ErasureCodingPolicy ecPolicy) {
9698
this.block = block;
9799
this.sources = sources;
98100
this.targets = targets;
99101
this.targetStorageIDs = targetStorageIDs;
100102
this.targetStorageTypes = targetStorageTypes;
101103
this.liveBlockIndices = liveBlockIndices == null ?
102104
new byte[]{} : liveBlockIndices;
105+
this.excludeReconstructedIndices = excludeReconstructedIndices;
103106
this.ecPolicy = ecPolicy;
104107
}
105108

@@ -127,6 +130,10 @@ public byte[] getLiveBlockIndices() {
127130
return liveBlockIndices;
128131
}
129132

133+
public byte[] getExcludeReconstructedIndices() {
134+
return excludeReconstructedIndices;
135+
}
136+
130137
public ErasureCodingPolicy getErasureCodingPolicy() {
131138
return ecPolicy;
132139
}

0 commit comments

Comments
 (0)