Skip to content

Commit 232e780

Browse files
authored
HDDS-12150. Abnormal container states should not crash the SCM ContainerReportHandler thread (apache#7882)
1 parent a708ea4 commit 232e780

3 files changed

Lines changed: 190 additions & 25 deletions

File tree

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,6 @@ private boolean updateContainerState(final DatanodeDetails datanode,
287287
}
288288

289289
if (replica.getState() == State.CLOSED) {
290-
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
291-
== container.getSequenceId());
292-
293290
/*
294291
For an EC container, only the first index and the parity indexes are
295292
guaranteed to have block data. So, update the container's state in SCM
@@ -305,8 +302,14 @@ private boolean updateContainerState(final DatanodeDetails datanode,
305302
}
306303
}
307304

308-
logger.info("Moving container {} to CLOSED state, datanode {} " +
309-
"reported CLOSED replica with index {}.", containerId, datanode,
305+
if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
306+
logger.warn("Ignored moving container {} from CLOSING to CLOSED state because replica bcsId ({}) " +
307+
"reported by datanode {} does not match sequenceId ({}).",
308+
containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
309+
return true;
310+
}
311+
logger.info("Moving container {} from CLOSING to CLOSED state, datanode {} " +
312+
"reported CLOSED replica with index {}.", containerId, datanode,
310313
replica.getReplicaIndex());
311314
containerManager.updateContainerState(containerId,
312315
LifeCycleEvent.CLOSE);
@@ -330,10 +333,15 @@ private boolean updateContainerState(final DatanodeDetails datanode,
330333
*
331334
*/
332335
if (replica.getState() == State.CLOSED) {
333-
logger.info("Moving container {} to CLOSED state, datanode {} " +
334-
"reported CLOSED replica.", containerId, datanode);
335-
Preconditions.checkArgument(replica.getBlockCommitSequenceId()
336-
== container.getSequenceId());
336+
if (!verifyBcsId(replica.getBlockCommitSequenceId(), container.getSequenceId(), datanode, containerId)) {
337+
logger.warn("Ignored moving container {} from QUASI_CLOSED to CLOSED state because replica bcsId ({}) " +
338+
"reported by datanode {} does not match sequenceId ({}).",
339+
containerId, replica.getBlockCommitSequenceId(), datanode, container.getSequenceId());
340+
return true;
341+
}
342+
logger.info("Moving container {} from QUASI_CLOSED to CLOSED state, datanode {} " +
343+
"reported CLOSED replica with index {}.", containerId, datanode,
344+
replica.getReplicaIndex());
337345
containerManager.updateContainerState(containerId,
338346
LifeCycleEvent.FORCE_CLOSE);
339347
}
@@ -372,6 +380,32 @@ private boolean updateContainerState(final DatanodeDetails datanode,
372380
return ignored;
373381
}
374382

383+
/**
384+
* Helper method to verify that the replica's bcsId matches the container's in SCM.
385+
* Throws IOException if the bcsIds do not match.
386+
* <p>
387+
* @param replicaBcsId Replica bcsId
388+
* @param containerBcsId Container bcsId in SCM
389+
* @param datanode DatanodeDetails for logging
390+
* @param containerId ContainerID for logging
391+
* @return true if verification has passed, false otherwise
392+
*/
393+
private boolean verifyBcsId(long replicaBcsId, long containerBcsId,
394+
DatanodeDetails datanode, ContainerID containerId) {
395+
396+
if (replicaBcsId != containerBcsId) {
397+
final String errMsg = "Unexpected bcsId for container " + containerId +
398+
" from datanode " + datanode + ". replica's: " + replicaBcsId +
399+
", SCM's: " + containerBcsId +
400+
". Ignoring container report for " + containerId;
401+
402+
logger.error(errMsg);
403+
return false;
404+
} else {
405+
return true;
406+
}
407+
}
408+
375409
private void updateContainerReplica(final DatanodeDetails datanodeDetails,
376410
final ContainerID containerId,
377411
final ContainerReplicaProto replicaProto)

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
2424
import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
2525
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.fail;
2627
import static org.mockito.Mockito.any;
2728
import static org.mockito.Mockito.doAnswer;
2829
import static org.mockito.Mockito.mock;
@@ -74,6 +75,8 @@
7475
import org.junit.jupiter.api.BeforeEach;
7576
import org.junit.jupiter.api.Test;
7677
import org.junit.jupiter.api.io.TempDir;
78+
import org.junit.jupiter.params.ParameterizedTest;
79+
import org.junit.jupiter.params.provider.EnumSource;
7780

7881
/**
7982
* Test the behaviour of the ContainerReportHandler.
@@ -166,7 +169,7 @@ private void testReplicaIndexUpdate(ContainerInfo container,
166169
Map<DatanodeDetails, Integer> expectedReplicaMap) {
167170
final ContainerReportsProto containerReport = getContainerReportsProto(
168171
container.containerID(), ContainerReplicaProto.State.CLOSED,
169-
dn.getUuidString(), 2000000000L, 100000000L, replicaIndex);
172+
dn.getUuidString(), 2000000000L, 100000000L, 10000L, replicaIndex);
170173
final ContainerReportFromDatanode containerReportFromDatanode =
171174
new ContainerReportFromDatanode(dn, containerReport);
172175
final ContainerReportHandler reportHandler = new ContainerReportHandler(
@@ -601,7 +604,7 @@ private void createAndHandleContainerReport(ContainerID containerID,
601604

602605
@Test
603606
public void testClosingToQuasiClosed()
604-
throws NodeNotFoundException, IOException, TimeoutException {
607+
throws NodeNotFoundException, IOException {
605608
/*
606609
* The container is in CLOSING state and all the replicas are in
607610
* OPEN/CLOSING state.
@@ -668,7 +671,7 @@ public void testClosingToQuasiClosed()
668671

669672
@Test
670673
public void testQuasiClosedToClosed()
671-
throws NodeNotFoundException, IOException, TimeoutException {
674+
throws NodeNotFoundException, IOException {
672675
/*
673676
* The container is in QUASI_CLOSED state.
674677
* - One of the replica is in QUASI_CLOSED state
@@ -737,6 +740,52 @@ public void testQuasiClosedToClosed()
737740
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(containerOne.containerID()).getState());
738741
}
739742

743+
@ParameterizedTest
744+
@EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
745+
public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState)
746+
throws NodeNotFoundException, IOException {
747+
/*
748+
* Negative test. When a replica with a (lower) mismatching bcsId gets reported,
749+
* expect the ContainerReportHandler thread to not throw uncaught exception.
750+
* (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
751+
*/
752+
final ContainerReportHandler reportHandler =
753+
new ContainerReportHandler(nodeManager, containerManager);
754+
final Iterator<DatanodeDetails> nodeIterator =
755+
nodeManager.getNodes(NodeStatus.inServiceHealthy()).iterator();
756+
757+
final DatanodeDetails dn1 = nodeIterator.next();
758+
final DatanodeDetails dn2 = nodeIterator.next();
759+
final DatanodeDetails dn3 = nodeIterator.next();
760+
761+
// Initial sequenceId 10000L is set here
762+
final ContainerInfo container1 = getContainer(lcState);
763+
764+
nodeManager.addContainer(dn1, container1.containerID());
765+
nodeManager.addContainer(dn2, container1.containerID());
766+
nodeManager.addContainer(dn3, container1.containerID());
767+
768+
containerStateManager.addContainer(container1.getProtobuf());
769+
770+
// Generate container report with replica in CLOSED state with intentional lower bcsId
771+
final ContainerReportsProto containerReport = getContainerReportsProto(
772+
container1.containerID(), ContainerReplicaProto.State.CLOSED,
773+
dn1.getUuidString(),
774+
2000L);
775+
final ContainerReportFromDatanode containerReportFromDatanode =
776+
new ContainerReportFromDatanode(dn1, containerReport);
777+
778+
// Handler should NOT throw IllegalArgumentException
779+
try {
780+
reportHandler.onMessage(containerReportFromDatanode, publisher);
781+
} catch (IllegalArgumentException iaEx) {
782+
fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
783+
}
784+
785+
// Because the container report is ignored, the container remains in the same previous state in SCM
786+
assertEquals(lcState, containerManager.getContainer(container1.containerID()).getState());
787+
}
788+
740789
@Test
741790
public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
742791
throws IOException, TimeoutException {
@@ -1092,7 +1141,7 @@ private ContainerReportFromDatanode getContainerReportFromDatanode(
10921141
DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) {
10931142
ContainerReportsProto containerReport = getContainerReportsProto(
10941143
containerId, state, dn.getUuidString(), bytesUsed, keyCount,
1095-
replicaIndex);
1144+
10000L, replicaIndex);
10961145

10971146
return new ContainerReportFromDatanode(dn, containerReport);
10981147
}
@@ -1101,20 +1150,34 @@ protected static ContainerReportsProto getContainerReportsProto(
11011150
final ContainerID containerId, final ContainerReplicaProto.State state,
11021151
final String originNodeId) {
11031152
return getContainerReportsProto(containerId, state, originNodeId,
1104-
2000000000L, 100000000L, 0);
1153+
2000000000L, 100000000L, 10000L, 0);
1154+
}
1155+
1156+
protected static ContainerReportsProto getContainerReportsProto(
1157+
final ContainerID containerId, final ContainerReplicaProto.State state,
1158+
final String originNodeId, final long bcsId) {
1159+
return getContainerReportsProto(containerId, state, originNodeId,
1160+
2000000000L, 100000000L, bcsId, 0);
11051161
}
11061162

11071163
protected static ContainerReportsProto getContainerReportsProto(
11081164
final ContainerID containerId, final ContainerReplicaProto.State state,
11091165
final String originNodeId, int replicaIndex) {
11101166
return getContainerReportsProto(containerId, state, originNodeId,
1111-
2000000000L, 100000000L, replicaIndex);
1167+
2000000000L, 100000000L, 10000L, replicaIndex);
1168+
}
1169+
1170+
protected static ContainerReportsProto getContainerReportsProto(
1171+
final ContainerID containerId, final ContainerReplicaProto.State state,
1172+
final String originNodeId, final long bcsId, int replicaIndex) {
1173+
return getContainerReportsProto(containerId, state, originNodeId,
1174+
2000000000L, 100000000L, bcsId, replicaIndex);
11121175
}
11131176

11141177
protected static ContainerReportsProto getContainerReportsProto(
11151178
final ContainerID containerId, final ContainerReplicaProto.State state,
11161179
final String originNodeId, final long usedBytes, final long keyCount,
1117-
final int replicaIndex) {
1180+
final long bcsId, final int replicaIndex) {
11181181
final ContainerReportsProto.Builder crBuilder =
11191182
ContainerReportsProto.newBuilder();
11201183
final ContainerReplicaProto replicaProto =
@@ -1130,7 +1193,7 @@ protected static ContainerReportsProto getContainerReportsProto(
11301193
.setWriteCount(100000000L)
11311194
.setReadBytes(2000000000L)
11321195
.setWriteBytes(2000000000L)
1133-
.setBlockCommitSequenceId(10000L)
1196+
.setBlockCommitSequenceId(bcsId)
11341197
.setDeleteTransactionId(0)
11351198
.setReplicaIndex(replicaIndex)
11361199
.build();

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
2828
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
2929
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.fail;
3031
import static org.mockito.Mockito.any;
3132
import static org.mockito.Mockito.doAnswer;
3233
import static org.mockito.Mockito.mock;
@@ -87,6 +88,8 @@
8788
import org.junit.jupiter.api.BeforeEach;
8889
import org.junit.jupiter.api.Test;
8990
import org.junit.jupiter.api.io.TempDir;
91+
import org.junit.jupiter.params.ParameterizedTest;
92+
import org.junit.jupiter.params.provider.EnumSource;
9093

9194
/**
9295
* Test cases to verify the functionality of IncrementalContainerReportHandler.
@@ -339,7 +342,7 @@ private List<DatanodeDetails> setupECContainerForTesting(
339342
}
340343

341344
@Test
342-
public void testClosingToQuasiClosed() throws IOException, TimeoutException {
345+
public void testClosingToQuasiClosed() throws IOException {
343346
final IncrementalContainerReportHandler reportHandler =
344347
new IncrementalContainerReportHandler(
345348
nodeManager, containerManager, scmContext);
@@ -372,7 +375,7 @@ public void testClosingToQuasiClosed() throws IOException, TimeoutException {
372375
}
373376

374377
@Test
375-
public void testQuasiClosedToClosed() throws IOException, TimeoutException {
378+
public void testQuasiClosedToClosed() throws IOException {
376379
final IncrementalContainerReportHandler reportHandler =
377380
new IncrementalContainerReportHandler(
378381
nodeManager, containerManager, scmContext);
@@ -407,6 +410,59 @@ public void testQuasiClosedToClosed() throws IOException, TimeoutException {
407410
assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(container.containerID()).getState());
408411
}
409412

413+
@ParameterizedTest
414+
@EnumSource(value = LifeCycleState.class, names = {"CLOSING", "QUASI_CLOSED"})
415+
public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleState lcState) throws IOException {
416+
/*
417+
* Negative test. When a replica with a (lower) mismatching bcsId gets reported,
418+
* expect the ContainerReportHandler thread to not throw uncaught exception.
419+
* (That exception lead to ContainerReportHandler thread crash before HDDS-12150.)
420+
*/
421+
final IncrementalContainerReportHandler reportHandler =
422+
new IncrementalContainerReportHandler(nodeManager, containerManager, scmContext);
423+
424+
// Initial sequenceId 10000L is set here
425+
final ContainerInfo container = getContainer(lcState);
426+
final DatanodeDetails datanodeOne = randomDatanodeDetails();
427+
final DatanodeDetails datanodeTwo = randomDatanodeDetails();
428+
final DatanodeDetails datanodeThree = randomDatanodeDetails();
429+
nodeManager.register(datanodeOne, null, null);
430+
nodeManager.register(datanodeTwo, null, null);
431+
nodeManager.register(datanodeThree, null, null);
432+
433+
final Set<ContainerReplica> containerReplicas = getReplicas(
434+
container.containerID(),
435+
ContainerReplicaProto.State.CLOSING,
436+
datanodeOne, datanodeTwo);
437+
containerReplicas.addAll(getReplicas(
438+
container.containerID(),
439+
ContainerReplicaProto.State.QUASI_CLOSED,
440+
datanodeThree));
441+
442+
containerStateManager.addContainer(container.getProtobuf());
443+
containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
444+
container.containerID(), r));
445+
446+
// Generate incremental container report with replica in CLOSED state with intentional lower bcsId
447+
final IncrementalContainerReportProto containerReport =
448+
getIncrementalContainerReportProto(container.containerID(),
449+
CLOSED, datanodeThree.getUuidString(), false, 0,
450+
2000L);
451+
final IncrementalContainerReportFromDatanode icr =
452+
new IncrementalContainerReportFromDatanode(
453+
datanodeOne, containerReport);
454+
455+
// Handler should NOT throw IllegalArgumentException
456+
try {
457+
reportHandler.onMessage(icr, publisher);
458+
} catch (IllegalArgumentException iaEx) {
459+
fail("Handler should not throw IllegalArgumentException: " + iaEx.getMessage());
460+
}
461+
462+
// Because the container report is ignored, the container remains in the same previous state in SCM
463+
assertEquals(lcState, containerManager.getContainer(container.containerID()).getState());
464+
}
465+
410466
@Test
411467
public void testOpenWithUnhealthyReplica() throws IOException {
412468
final IncrementalContainerReportHandler reportHandler =
@@ -580,11 +636,23 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
580636

581637
private static IncrementalContainerReportProto
582638
getIncrementalContainerReportProto(
583-
final ContainerID containerId,
584-
final ContainerReplicaProto.State state,
585-
final String originNodeId,
586-
final boolean hasReplicaIndex,
587-
final int replicaIndex) {
639+
final ContainerID containerId,
640+
final ContainerReplicaProto.State state,
641+
final String originNodeId,
642+
final boolean hasReplicaIndex,
643+
final int replicaIndex) {
644+
return getIncrementalContainerReportProto(containerId, state, originNodeId,
645+
hasReplicaIndex, replicaIndex, 10000L);
646+
}
647+
648+
private static IncrementalContainerReportProto
649+
getIncrementalContainerReportProto(
650+
final ContainerID containerId,
651+
final ContainerReplicaProto.State state,
652+
final String originNodeId,
653+
final boolean hasReplicaIndex,
654+
final int replicaIndex,
655+
final long bcsId) {
588656
final ContainerReplicaProto.Builder replicaProto =
589657
ContainerReplicaProto.newBuilder()
590658
.setContainerID(containerId.getId())
@@ -598,7 +666,7 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
598666
.setWriteCount(100000000L)
599667
.setReadBytes(2000000000L)
600668
.setWriteBytes(2000000000L)
601-
.setBlockCommitSequenceId(10000L)
669+
.setBlockCommitSequenceId(bcsId)
602670
.setDeleteTransactionId(0);
603671
if (hasReplicaIndex) {
604672
replicaProto.setReplicaIndex(replicaIndex);

0 commit comments

Comments
 (0)