Skip to content

Commit 07c05c3

Browse files
author
limingxiang02
committed
HDFS-16470.Change some frequent method lock type in ReplicaMap.
1 parent a631f45 commit 07c05c3

2 files changed

Lines changed: 59 additions & 8 deletions

File tree

  • hadoop-hdfs-project/hadoop-hdfs/src

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ ReplicaInfo get(String bpid, long blockId) {
120120
ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
121121
checkBlockPool(bpid);
122122
checkBlock(replicaInfo);
123-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
123+
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
124124
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
125125
if (m == null) {
126126
// Add an entry for block pool if it does not exist already
127-
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
128-
map.put(bpid, m);
127+
map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
128+
m = map.get(bpid);
129129
}
130130
return m.put(replicaInfo);
131131
}
@@ -138,12 +138,12 @@ ReplicaInfo add(String bpid, ReplicaInfo replicaInfo) {
138138
ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
139139
checkBlockPool(bpid);
140140
checkBlock(replicaInfo);
141-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
141+
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
142142
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
143143
if (m == null) {
144144
// Add an entry for block pool if it does not exist already
145-
m = new LightWeightResizableGSet<Block, ReplicaInfo>();
146-
map.put(bpid, m);
145+
map.putIfAbsent(bpid, new LightWeightResizableGSet<Block, ReplicaInfo>());
146+
m = map.get(bpid);
147147
}
148148
ReplicaInfo oldReplicaInfo = m.get(replicaInfo);
149149
if (oldReplicaInfo != null) {
@@ -202,7 +202,7 @@ void mergeAll(ReplicaMap other) {
202202
ReplicaInfo remove(String bpid, Block block) {
203203
checkBlockPool(bpid);
204204
checkBlock(block);
205-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
205+
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
206206
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
207207
if (m != null) {
208208
ReplicaInfo replicaInfo = m.get(block);
@@ -224,7 +224,7 @@ ReplicaInfo remove(String bpid, Block block) {
224224
*/
225225
ReplicaInfo remove(String bpid, long blockId) {
226226
checkBlockPool(bpid);
227-
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
227+
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, bpid)) {
228228
LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
229229
if (m != null) {
230230
return m.remove(new Block(blockId));

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import java.io.OutputStream;
2222
import java.nio.file.Files;
2323
import java.nio.file.Paths;
24+
import java.util.Random;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
2428
import java.util.function.Supplier;
2529

2630
import org.apache.hadoop.fs.DF;
@@ -602,6 +606,53 @@ public void run() {}
602606
+ "volumeMap.", 0, totalNumReplicas);
603607
}
604608

609+
@Test(timeout = 30000)
610+
public void testConcurrentWriteAndDeleteBlock() throws Exception {
611+
// Feed FsDataset with block metadata.
612+
final int numBlocks = 1000;
613+
final int threadCount = 10;
614+
// Generate data blocks.
615+
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
616+
List<Future<?>> futureList = new ArrayList<>();
617+
Random random = new Random();
618+
// Random write block and delete half of them.
619+
for (int i = 0; i < threadCount; i++) {
620+
Thread thread = new Thread() {
621+
@Override
622+
public void run() {
623+
try {
624+
String bpid = BLOCK_POOL_IDS[random.nextInt(BLOCK_POOL_IDS.length)];
625+
for (int blockId = 0; blockId < numBlocks; blockId++) {
626+
ExtendedBlock eb = new ExtendedBlock(bpid, blockId);
627+
ReplicaHandler replica = null;
628+
try {
629+
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
630+
false);
631+
if (blockId % 2 > 0) {
632+
dataset.invalidate(bpid, new Block[]{eb.getLocalBlock()});
633+
}
634+
} finally {
635+
if (replica != null) {
636+
replica.close();
637+
}
638+
}
639+
}
640+
// Just keep final consistency no need to care exception.
641+
} catch (Exception ignore) {}
642+
}
643+
};
644+
thread.setName("AddBlock" + i);
645+
futureList.add(pool.submit(thread));
646+
}
647+
// Wait for data generation
648+
for (Future<?> f : futureList) {
649+
f.get();
650+
}
651+
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
652+
assertEquals(numBlocks / 2, dataset.volumeMap.size(bpid));
653+
}
654+
}
655+
605656
@Test(timeout = 5000)
606657
public void testRemoveNewlyAddedVolume() throws IOException {
607658
final int numExistingVolumes = getNumVolumes();

0 commit comments

Comments
 (0)