From aa8ef76db6f9dd6b89d3028cf39930ab31799e34 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Mon, 12 Aug 2024 19:14:36 +0800 Subject: [PATCH 01/12] HDDS-11304. Code cleanup in CommandDispatcher --- .../CloseContainerCommandHandler.java | 12 +++++++++++- .../ClosePipelineCommandHandler.java | 11 +++++++++++ .../commandhandler/CommandDispatcher.java | 19 ++++++++----------- .../CreatePipelineCommandHandler.java | 13 +++++++++++++ .../DeleteBlocksCommandHandler.java | 4 ++-- .../DeleteContainerCommandHandler.java | 10 ++++++++++ .../ReplicateContainerCommandHandler.java | 16 +++++----------- .../replication/ReplicationSupervisor.java | 9 +++++++++ 8 files changed, 69 insertions(+), 25 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 8533f7384d41..1bd3058bdcbc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -62,7 +62,7 @@ public class CloseContainerCommandHandler implements CommandHandler { private long totalTime; /** - * Constructs a ContainerReport handler. + * Constructs a close container command handler. */ public CloseContainerCommandHandler( int threadPoolSize, int queueSize, String threadNamePrefix) { @@ -220,4 +220,14 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } + + @Override + public int getThreadPoolMaxPoolSize() { + return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + } + + @Override + public int getThreadPoolActivePoolSize() { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index 241abb6f4ae1..b21d14ea858b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -46,6 +46,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -202,4 +203,14 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } + + @Override + public int getThreadPoolMaxPoolSize() { + return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + } + + @Override + public int getThreadPoolActivePoolSize() { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 9035b79c6709..e1c567edbf9b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -56,11 +56,6 @@ public final class CommandDispatcher { private CommandDispatcher(OzoneContainer container, SCMConnectionManager connectionManager, StateContext context, CommandHandler... handlers) { - Preconditions.checkNotNull(context); - Preconditions.checkNotNull(handlers); - Preconditions.checkArgument(handlers.length > 0); - Preconditions.checkNotNull(container); - Preconditions.checkNotNull(connectionManager); this.context = context; this.container = container; this.connectionManager = connectionManager; @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap); } + @VisibleForTesting public CommandHandler getCloseContainerHandler() { return handlerMap.get(Type.closeContainerCommand); } @@ -102,7 +98,7 @@ public void handle(SCMCommand command) { LOG.error("Exception while handle command, ", ex); } } else { - LOG.error("Unknown SCM Command queued. There is no handler for this " + + LOG.warn("Unknown SCM Command queued. There is no handler for this " + "command. Command: {}", command.getType().getDescriptorForType() .getName()); } @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) { * @return Command Dispatcher. */ public CommandDispatcher build() { - Preconditions.checkNotNull(this.connectionManager, "Missing connection" + - " manager."); - Preconditions.checkNotNull(this.container, "Missing container."); - Preconditions.checkNotNull(this.context, "Missing context."); - Preconditions.checkArgument(this.handlerList.size() > 0); + Preconditions.checkNotNull(this.connectionManager, + "Missing scm connection manager."); + Preconditions.checkNotNull(this.container, "Missing ozone container."); + Preconditions.checkNotNull(this.context, "Missing state context."); + Preconditions.checkArgument(this.handlerList.size() > 0, + "The number of command handlers must be greater than 0."); return new CommandDispatcher(this.container, this.connectionManager, this.context, handlerList.toArray( new CommandHandler[handlerList.size()])); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index 4a36a1987de6..7c3491ef917d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.List; @@ -182,4 +183,16 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } + + @SuppressWarnings("checkstyle:Indentation") + @Override + public int getThreadPoolMaxPoolSize() { + return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + } + + @SuppressWarnings("checkstyle:Indentation") + @Override + public int getThreadPoolActivePoolSize() { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 747749066e3d..bd7431c61452 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -168,12 +168,12 @@ public int getQueuedCount() { @Override public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + return executor.getMaximumPoolSize(); } @Override public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); + return executor.getActiveCount(); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index ead81c32e5b2..36d763481511 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -160,6 +160,16 @@ public long getTotalRunTime() { return totalTime.get(); } + @Override + public int getThreadPoolMaxPoolSize() { + return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + } + + @Override + public int getThreadPoolActivePoolSize() { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } + @Override public void stop() { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 21b26339e238..791401aed400 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -43,12 +43,6 @@ public class ReplicateContainerCommandHandler implements CommandHandler { static final Logger LOG = LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); - private int invocationCount; - - private long totalTime; - - private ConfigurationSource conf; - private ReplicationSupervisor supervisor; private ContainerReplicator downloadReplicator; @@ -60,7 +54,6 @@ public ReplicateContainerCommandHandler( ReplicationSupervisor supervisor, ContainerReplicator downloadReplicator, ContainerReplicator pushReplicator) { - this.conf = conf; this.supervisor = supervisor; this.downloadReplicator = downloadReplicator; this.pushReplicator = pushReplicator; @@ -101,19 +94,20 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return this.invocationCount; + return (int) supervisor.getReplicationRequestCount(); } @Override public long getAverageRunTime() { - if (invocationCount > 0) { - return totalTime / invocationCount; + int currentCount = getInvocationCount(); + if (currentCount > 0) { + return getTotalRunTime() / currentCount; } return 0; } @Override public long getTotalRunTime() { - return totalTime; + return supervisor.getTotalTime(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 5ceea125e814..d178d25ed594 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,6 +77,7 @@ public final class ReplicationSupervisor { private final AtomicLong failureCounter = new AtomicLong(); private final AtomicLong timeoutCounter = new AtomicLong(); private final AtomicLong skippedCounter = new AtomicLong(); + private final AtomicLong totalTimeCounter = new AtomicLong(); /** * A set of container IDs that are currently being downloaded @@ -329,6 +331,7 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { + long startTime = Time.monotonicNow(); try { requestCounter.incrementAndGet(); @@ -377,8 +380,10 @@ public void run() { LOG.warn("Failed {}", this, e); failureCounter.incrementAndGet(); } finally { + long endTime = Time.monotonicNow(); inFlight.remove(task); decrementTaskCounter(task); + totalTimeCounter.addAndGet(endTime - startTime); } } @@ -442,6 +447,10 @@ public long getReplicationSuccessCount() { return successCounter.get(); } + public long getTotalTime() { + return totalTimeCounter.get(); + } + public long getReplicationFailureCount() { return failureCounter.get(); } From b679fb57a93ec3815161a37effd4637f39a0de6d Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 11:16:07 +0800 Subject: [PATCH 02/12] Add some unit tests --- .../TestCloseContainerCommandHandler.java | 9 +++++++ .../TestClosePipelineCommandHandler.java | 26 +++++++++++++++++++ .../TestReplicationSupervisor.java | 2 ++ 3 files changed, 37 insertions(+) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index 219645c8edcc..1f525c6c6447 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.UUID; @@ -42,6 +43,7 @@ import static java.util.Collections.singletonMap; import static org.apache.hadoop.ozone.OzoneConsts.GB; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; @@ -292,4 +294,11 @@ private void waitTillFinishExecution( GenericTestUtils.waitFor(() -> closeHandler.getQueuedCount() <= 0, 10, 3000); } + + @Test + public void testThreadPoolPoolSize() { + assertEquals(1, subject.getThreadPoolMaxPoolSize()); + assertEquals(0, subject.getThreadPoolActivePoolSize()); + } + } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index ac60fba1ae9d..1ce9b785a10f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -43,8 +43,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.Executors; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.eq; @@ -141,4 +143,28 @@ private List getDatanodes() { return Arrays.asList(dnOne, dnTwo, dnThree); } + @Test + public void testThreadPoolPoolSize() throws IOException { + final List datanodes = getDatanodes(); + final DatanodeDetails currentDatanode = datanodes.get(0); + final PipelineID pipelineID = PipelineID.randomId(); + final SCMCommand command = + new ClosePipelineCommand(pipelineID); + stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); + + XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); + when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); + when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false); + final ClosePipelineCommandHandler commandHandler = + new ClosePipelineCommandHandler(conf, Executors.newFixedThreadPool(3)); + commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + + assertEquals(3, commandHandler.getThreadPoolMaxPoolSize()); + assertEquals(1, commandHandler.getThreadPoolActivePoolSize()); + verify(writeChannel, times(0)) + .removeGroup(pipelineID.getProtobuf()); + verify(raftClientGroupManager, times(0)) + .remove(any(), anyBoolean(), anyBoolean()); + } + } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 1f69db78d625..6509222c0f17 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -82,6 +82,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; import static org.mockito.Mockito.any; @@ -158,6 +159,7 @@ public void normal(ContainerLayoutVersion layout) { assertEquals(0, supervisor.getTotalInFlightReplications()); assertEquals(0, supervisor.getQueueSize()); assertEquals(3, set.containerCount()); + assertTrue(supervisor.getTotalTime() > 0); MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl(); metrics.getMetrics(metricsCollector, true); From 8d488317e165ce16d0cf6e74ebb9f7ebb9436bb0 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 12:30:40 +0800 Subject: [PATCH 03/12] Update unit tests --- .../ClosePipelineCommandHandler.java | 11 --------- .../CreatePipelineCommandHandler.java | 13 ---------- .../TestClosePipelineCommandHandler.java | 24 ------------------- 3 files changed, 48 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java index b21d14ea858b..241abb6f4ae1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java @@ -46,7 +46,6 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -203,14 +202,4 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } - - @Override - public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); - } - - @Override - public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); - } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index 7c3491ef917d..4a36a1987de6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.List; @@ -183,16 +182,4 @@ public long getTotalRunTime() { public int getQueuedCount() { return queuedCount.get(); } - - @SuppressWarnings("checkstyle:Indentation") - @Override - public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); - } - - @SuppressWarnings("checkstyle:Indentation") - @Override - public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); - } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index 1ce9b785a10f..73394d66d6e3 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -143,28 +143,4 @@ private List getDatanodes() { return Arrays.asList(dnOne, dnTwo, dnThree); } - @Test - public void testThreadPoolPoolSize() throws IOException { - final List datanodes = getDatanodes(); - final DatanodeDetails currentDatanode = datanodes.get(0); - final PipelineID pipelineID = PipelineID.randomId(); - final SCMCommand command = - new ClosePipelineCommand(pipelineID); - stateContext = ContainerTestUtils.getMockContext(currentDatanode, conf); - - XceiverServerRatis writeChannel = mock(XceiverServerRatis.class); - when(ozoneContainer.getWriteChannel()).thenReturn(writeChannel); - when(writeChannel.isExist(pipelineID.getProtobuf())).thenReturn(false); - final ClosePipelineCommandHandler commandHandler = - new ClosePipelineCommandHandler(conf, Executors.newFixedThreadPool(3)); - commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); - - assertEquals(3, commandHandler.getThreadPoolMaxPoolSize()); - assertEquals(1, commandHandler.getThreadPoolActivePoolSize()); - verify(writeChannel, times(0)) - .removeGroup(pipelineID.getProtobuf()); - verify(raftClientGroupManager, times(0)) - .remove(any(), anyBoolean(), anyBoolean()); - } - } From eb8d2f783963a37dfcc1afae51bb671ad8424b70 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 12:35:42 +0800 Subject: [PATCH 04/12] Update unit tests --- .../commandhandler/TestClosePipelineCommandHandler.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java index 73394d66d6e3..ac60fba1ae9d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestClosePipelineCommandHandler.java @@ -43,10 +43,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.Executors; import java.util.stream.Collectors; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.eq; From 8ef09f2369267b3d8462dc23183e7a648ed83710 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 15:29:49 +0800 Subject: [PATCH 05/12] Update unit tests --- ...ReconstructECContainersCommandHandler.java | 9 ++++- .../ReplicateContainerCommandHandler.java | 10 ++--- .../replication/ReplicationSupervisor.java | 37 +++++++++++++++---- .../TestReplicationSupervisor.java | 30 ++++++++++++++- 4 files changed, 71 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 602687d7a003..2db9533e0c56 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -63,17 +63,22 @@ public Type getCommandType() { @Override public int getInvocationCount() { - return 0; + return (int) supervisor.getReplicationRequestCount( + ECReconstructionCoordinatorTask.class); } @Override public long getAverageRunTime() { + int invocationCount = getInvocationCount(); + if (invocationCount > 0) { + return getTotalRunTime() / invocationCount; + } return 0; } @Override public long getTotalRunTime() { - return 0; + return supervisor.getTotalTime(ECReconstructionCoordinatorTask.class); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 791401aed400..0cb63810114e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -94,20 +94,20 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return (int) supervisor.getReplicationRequestCount(); + return (int) supervisor.getReplicationRequestCount(ReplicationTask.class); } @Override public long getAverageRunTime() { - int currentCount = getInvocationCount(); - if (currentCount > 0) { - return getTotalRunTime() / currentCount; + int invocationCount = getInvocationCount(); + if (invocationCount > 0) { + return getTotalRunTime() / invocationCount; } return 0; } @Override public long getTotalRunTime() { - return supervisor.getTotalTime(); + return supervisor.getTotalTime(ReplicationTask.class); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index d178d25ed594..b226b514fb13 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -72,12 +72,15 @@ public final class ReplicationSupervisor { private final StateContext context; private final Clock clock; - private final AtomicLong requestCounter = new AtomicLong(); + private final Map, AtomicLong> requestCounter = + new ConcurrentHashMap<>(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); private final AtomicLong timeoutCounter = new AtomicLong(); private final AtomicLong skippedCounter = new AtomicLong(); - private final AtomicLong totalTimeCounter = new AtomicLong(); + private final Map, AtomicLong> totalTimeCounter = + new ConcurrentHashMap<>(); + private Object lock = new Object(); /** * A set of container IDs that are currently being downloaded @@ -223,6 +226,15 @@ public void addTask(AbstractReplicationTask task) { return; } + synchronized (lock) { + if (totalTimeCounter.get(task.getClass()) == null) { + totalTimeCounter.put(task.getClass(), new AtomicLong()); + } + if (requestCounter.get(task.getClass()) == null) { + requestCounter.put(task.getClass(), new AtomicLong()); + } + } + if (inFlight.add(task)) { if (task.getPriority() != ReplicationCommandPriority.LOW) { // Low priority tasks are not included in the replication queue sizes @@ -333,7 +345,7 @@ public TaskRunner(AbstractReplicationTask task) { public void run() { long startTime = Time.monotonicNow(); try { - requestCounter.incrementAndGet(); + requestCounter.get(task.getClass()).incrementAndGet(); final long now = clock.millis(); final long deadline = task.getDeadline(); @@ -383,7 +395,7 @@ public void run() { long endTime = Time.monotonicNow(); inFlight.remove(task); decrementTaskCounter(task); - totalTimeCounter.addAndGet(endTime - startTime); + totalTimeCounter.get(task.getClass()).addAndGet(endTime - startTime); } } @@ -424,7 +436,17 @@ public boolean equals(Object o) { } public long getReplicationRequestCount() { - return requestCounter.get(); + AtomicLong totalRequest = new AtomicLong(); + requestCounter.forEach((key, value) -> { + totalRequest.set(totalRequest.get() + value.get()); + }); + return totalRequest.get(); + } + + public long getReplicationRequestCount( + Class taskClass) { + AtomicLong counter = requestCounter.get(taskClass); + return counter == null ? 0 : counter.get(); } public long getQueueSize() { @@ -447,8 +469,9 @@ public long getReplicationSuccessCount() { return successCounter.get(); } - public long getTotalTime() { - return totalTimeCounter.get(); + public long getTotalTime(Class taskClass) { + AtomicLong counter = totalTimeCounter.get(taskClass); + return counter == null ? 0 : counter.get(); } public long getReplicationFailureCount() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 6509222c0f17..d66e2b55ac43 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -159,7 +159,6 @@ public void normal(ContainerLayoutVersion layout) { assertEquals(0, supervisor.getTotalInFlightReplications()); assertEquals(0, supervisor.getQueueSize()); assertEquals(3, set.containerCount()); - assertTrue(supervisor.getTotalTime() > 0); MetricsCollectorImpl metricsCollector = new MetricsCollectorImpl(); metrics.getMetrics(metricsCollector, true); @@ -170,6 +169,35 @@ public void normal(ContainerLayoutVersion layout) { } } + @ContainerLayoutTestInfo.ContainerTest + public void normal2(ContainerLayoutVersion layout) { + this.layoutVersion = layout; + // GIVEN + ReplicationSupervisor supervisor = + supervisorWithReplicator(FakeReplicator::new); + try { + //WHEN + supervisor.addTask(createTask(1L)); + supervisor.addTask(createECTask(2L)); + supervisor.addTask(createECTask(3L)); + // Sleep 5s, wait all tasks processed + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + assertTrue(supervisor.getTotalTime(ReplicationTask.class) > 0); + assertTrue(supervisor.getTotalTime( + ECReconstructionCoordinatorTask.class) > 0); + assertEquals(1, + supervisor.getReplicationRequestCount(ReplicationTask.class)); + assertEquals(2, + supervisor.getReplicationRequestCount( + ECReconstructionCoordinatorTask.class)); + } finally { + supervisor.stop(); + } + } + @ContainerLayoutTestInfo.ContainerTest public void duplicateMessage(ContainerLayoutVersion layout) { this.layoutVersion = layout; From 0980c38d0feae2a64b82c4e02f607f8e652a2982 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 16:29:09 +0800 Subject: [PATCH 06/12] Update unit tests --- .../container/replication/TestReplicationSupervisor.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index d66e2b55ac43..97182d41899e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -180,14 +180,12 @@ public void normal2(ContainerLayoutVersion layout) { supervisor.addTask(createTask(1L)); supervisor.addTask(createECTask(2L)); supervisor.addTask(createECTask(3L)); - // Sleep 5s, wait all tasks processed + // Sleep 10s, wait all tasks processed try { - Thread.sleep(5000); + Thread.sleep(10000); } catch (InterruptedException e) { } assertTrue(supervisor.getTotalTime(ReplicationTask.class) > 0); - assertTrue(supervisor.getTotalTime( - ECReconstructionCoordinatorTask.class) > 0); assertEquals(1, supervisor.getReplicationRequestCount(ReplicationTask.class)); assertEquals(2, From 4db3ff452241842db1010cf9238f72b475ce4ec2 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Thu, 15 Aug 2024 19:28:49 +0800 Subject: [PATCH 07/12] Update unit tests --- .../replication/ReplicationSupervisor.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index b226b514fb13..4e4112cf9e17 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -226,12 +226,19 @@ public void addTask(AbstractReplicationTask task) { return; } - synchronized (lock) { - if (totalTimeCounter.get(task.getClass()) == null) { - totalTimeCounter.put(task.getClass(), new AtomicLong()); + if (totalTimeCounter.get(task.getClass()) == null) { + synchronized (lock) { + if (totalTimeCounter.get(task.getClass()) == null) { + totalTimeCounter.put(task.getClass(), new AtomicLong()); + } } - if (requestCounter.get(task.getClass()) == null) { - requestCounter.put(task.getClass(), new AtomicLong()); + } + + if (requestCounter.get(task.getClass()) == null) { + synchronized (lock) { + if (requestCounter.get(task.getClass()) == null) { + requestCounter.put(task.getClass(), new AtomicLong()); + } } } From a63871fe4b5102905e8b71998a2d52d2dcda8a89 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Fri, 16 Aug 2024 11:05:32 +0800 Subject: [PATCH 08/12] Update log level --- .../common/statemachine/commandhandler/CommandDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index e1c567edbf9b..c3f8da74c7a8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -98,7 +98,7 @@ public void handle(SCMCommand command) { LOG.error("Exception while handle command, ", ex); } } else { - LOG.warn("Unknown SCM Command queued. There is no handler for this " + + LOG.error("Unknown SCM Command queued. There is no handler for this " + "command. Command: {}", command.getType().getDescriptorForType() .getName()); } From 312e97f4ab9c1d51f26d8888c04b30942f3a59bf Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Tue, 27 Aug 2024 17:34:53 +0800 Subject: [PATCH 09/12] Update some code. --- .../CloseContainerCommandHandler.java | 7 ++- .../DeleteContainerCommandHandler.java | 11 +++-- ...ReconstructECContainersCommandHandler.java | 9 +--- .../ReplicateContainerCommandHandler.java | 14 ++++-- .../replication/ReplicationSupervisor.java | 45 ++----------------- .../TestCloseContainerCommandHandler.java | 9 ++++ .../TestDeleteContainerCommandHandler.java | 12 +++-- .../TestReplicationSupervisor.java | 28 ------------ 8 files changed, 41 insertions(+), 94 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 1bd3058bdcbc..bc703ac6a552 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -18,7 +18,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,7 +57,7 @@ public class CloseContainerCommandHandler implements CommandHandler { private final AtomicLong invocationCount = new AtomicLong(0); private final AtomicInteger queuedCount = new AtomicInteger(0); - private final ExecutorService executor; + private final ThreadPoolExecutor executor; private long totalTime; /** @@ -223,11 +222,11 @@ public int getQueuedCount() { @Override public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + return executor.getMaximumPoolSize(); } @Override public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); + return executor.getActiveCount(); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java index 36d763481511..b76e306e1c07 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.time.Clock; import java.util.OptionalLong; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements CommandHandler { private final AtomicInteger invocationCount = new AtomicInteger(0); private final AtomicInteger timeoutCount = new AtomicInteger(0); private final AtomicLong totalTime = new AtomicLong(0); - private final ExecutorService executor; + private final ThreadPoolExecutor executor; private final Clock clock; private int maxQueueSize; @@ -70,7 +69,7 @@ public DeleteContainerCommandHandler( } protected DeleteContainerCommandHandler(Clock clock, - ExecutorService executor, int queueSize) { + ThreadPoolExecutor executor, int queueSize) { this.executor = executor; this.clock = clock; maxQueueSize = queueSize; @@ -131,7 +130,7 @@ private void handleInternal(SCMCommand command, StateContext context, @Override public int getQueuedCount() { - return ((ThreadPoolExecutor)executor).getQueue().size(); + return executor.getQueue().size(); } @Override @@ -162,12 +161,12 @@ public long getTotalRunTime() { @Override public int getThreadPoolMaxPoolSize() { - return ((ThreadPoolExecutor)executor).getMaximumPoolSize(); + return executor.getMaximumPoolSize(); } @Override public int getThreadPoolActivePoolSize() { - return ((ThreadPoolExecutor)executor).getActiveCount(); + return executor.getActiveCount(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 2db9533e0c56..602687d7a003 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -63,22 +63,17 @@ public Type getCommandType() { @Override public int getInvocationCount() { - return (int) supervisor.getReplicationRequestCount( - ECReconstructionCoordinatorTask.class); + return 0; } @Override public long getAverageRunTime() { - int invocationCount = getInvocationCount(); - if (invocationCount > 0) { - return getTotalRunTime() / invocationCount; - } return 0; } @Override public long getTotalRunTime() { - return supervisor.getTotalTime(ECReconstructionCoordinatorTask.class); + return 0; } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 0cb63810114e..21b26339e238 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -43,6 +43,12 @@ public class ReplicateContainerCommandHandler implements CommandHandler { static final Logger LOG = LoggerFactory.getLogger(ReplicateContainerCommandHandler.class); + private int invocationCount; + + private long totalTime; + + private ConfigurationSource conf; + private ReplicationSupervisor supervisor; private ContainerReplicator downloadReplicator; @@ -54,6 +60,7 @@ public ReplicateContainerCommandHandler( ReplicationSupervisor supervisor, ContainerReplicator downloadReplicator, ContainerReplicator pushReplicator) { + this.conf = conf; this.supervisor = supervisor; this.downloadReplicator = downloadReplicator; this.pushReplicator = pushReplicator; @@ -94,20 +101,19 @@ public SCMCommandProto.Type getCommandType() { @Override public int getInvocationCount() { - return (int) supervisor.getReplicationRequestCount(ReplicationTask.class); + return this.invocationCount; } @Override public long getAverageRunTime() { - int invocationCount = getInvocationCount(); if (invocationCount > 0) { - return getTotalRunTime() / invocationCount; + return totalTime / invocationCount; } return 0; } @Override public long getTotalRunTime() { - return supervisor.getTotalTime(ReplicationTask.class); + return totalTime; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 4e4112cf9e17..5ceea125e814 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -49,7 +49,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,15 +71,11 @@ public final class ReplicationSupervisor { private final StateContext context; private final Clock clock; - private final Map, AtomicLong> requestCounter = - new ConcurrentHashMap<>(); + private final AtomicLong requestCounter = new AtomicLong(); private final AtomicLong successCounter = new AtomicLong(); private final AtomicLong failureCounter = new AtomicLong(); private final AtomicLong timeoutCounter = new AtomicLong(); private final AtomicLong skippedCounter = new AtomicLong(); - private final Map, AtomicLong> totalTimeCounter = - new ConcurrentHashMap<>(); - private Object lock = new Object(); /** * A set of container IDs that are currently being downloaded @@ -226,22 +221,6 @@ public void addTask(AbstractReplicationTask task) { return; } - if (totalTimeCounter.get(task.getClass()) == null) { - synchronized (lock) { - if (totalTimeCounter.get(task.getClass()) == null) { - totalTimeCounter.put(task.getClass(), new AtomicLong()); - } - } - } - - if (requestCounter.get(task.getClass()) == null) { - synchronized (lock) { - if (requestCounter.get(task.getClass()) == null) { - requestCounter.put(task.getClass(), new AtomicLong()); - } - } - } - if (inFlight.add(task)) { if (task.getPriority() != ReplicationCommandPriority.LOW) { // Low priority tasks are not included in the replication queue sizes @@ -350,9 +329,8 @@ public TaskRunner(AbstractReplicationTask task) { @Override public void run() { - long startTime = Time.monotonicNow(); try { - requestCounter.get(task.getClass()).incrementAndGet(); + requestCounter.incrementAndGet(); final long now = clock.millis(); final long deadline = task.getDeadline(); @@ -399,10 +377,8 @@ public void run() { LOG.warn("Failed {}", this, e); failureCounter.incrementAndGet(); } finally { - long endTime = Time.monotonicNow(); inFlight.remove(task); decrementTaskCounter(task); - totalTimeCounter.get(task.getClass()).addAndGet(endTime - startTime); } } @@ -443,17 +419,7 @@ public boolean equals(Object o) { } public long getReplicationRequestCount() { - AtomicLong totalRequest = new AtomicLong(); - requestCounter.forEach((key, value) -> { - totalRequest.set(totalRequest.get() + value.get()); - }); - return totalRequest.get(); - } - - public long getReplicationRequestCount( - Class taskClass) { - AtomicLong counter = requestCounter.get(taskClass); - return counter == null ? 0 : counter.get(); + return requestCounter.get(); } public long getQueueSize() { @@ -476,11 +442,6 @@ public long getReplicationSuccessCount() { return successCounter.get(); } - public long getTotalTime(Class taskClass) { - AtomicLong counter = totalTimeCounter.get(taskClass); - return counter == null ? 0 : counter.get(); - } - public long getReplicationFailureCount() { return failureCounter.get(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index 1f525c6c6447..aebff9154052 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -299,6 +299,15 @@ private void waitTillFinishExecution( public void testThreadPoolPoolSize() { assertEquals(1, subject.getThreadPoolMaxPoolSize()); assertEquals(0, subject.getThreadPoolActivePoolSize()); + + CloseContainerCommandHandler closeContainerCommandHandler = + new CloseContainerCommandHandler(10, 10, ""); + closeContainerCommandHandler.handle(closeWithUnknownPipeline(), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(closeWithUnknownPipeline(), + ozoneContainer, context, null); + assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize()); + assertEquals(2, closeContainerCommandHandler.getThreadPoolActivePoolSize()); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java index 49c34828fbd6..b37e51825a20 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; @@ -32,7 +37,6 @@ import java.time.ZoneId; import java.util.OptionalLong; -import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -163,8 +167,10 @@ private static DeleteContainerCommandHandler createSubject() { private static DeleteContainerCommandHandler createSubject( TestClock clock, int queueSize) { - return new DeleteContainerCommandHandler(clock, - newDirectExecutorService(), queueSize); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors. + newFixedThreadPool(1, threadFactory); + return new DeleteContainerCommandHandler(clock, executor, queueSize); } private static DeleteContainerCommandHandler createSubjectWithPoolSize( diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 97182d41899e..1f69db78d625 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -82,7 +82,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; import static org.mockito.Mockito.any; @@ -169,33 +168,6 @@ public void normal(ContainerLayoutVersion layout) { } } - @ContainerLayoutTestInfo.ContainerTest - public void normal2(ContainerLayoutVersion layout) { - this.layoutVersion = layout; - // GIVEN - ReplicationSupervisor supervisor = - supervisorWithReplicator(FakeReplicator::new); - try { - //WHEN - supervisor.addTask(createTask(1L)); - supervisor.addTask(createECTask(2L)); - supervisor.addTask(createECTask(3L)); - // Sleep 10s, wait all tasks processed - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - } - assertTrue(supervisor.getTotalTime(ReplicationTask.class) > 0); - assertEquals(1, - supervisor.getReplicationRequestCount(ReplicationTask.class)); - assertEquals(2, - supervisor.getReplicationRequestCount( - ECReconstructionCoordinatorTask.class)); - } finally { - supervisor.stop(); - } - } - @ContainerLayoutTestInfo.ContainerTest public void duplicateMessage(ContainerLayoutVersion layout) { this.layoutVersion = layout; From d97dcce887aad853b23caaeb854bc65bd94da61c Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Tue, 27 Aug 2024 20:14:06 +0800 Subject: [PATCH 10/12] Fix unit tests. --- .../TestCloseContainerCommandHandler.java | 17 +++++++++++++---- .../TestDeleteContainerCommandHandler.java | 5 ++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index aebff9154052..a3b60aa36dab 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -43,8 +43,9 @@ import static java.util.Collections.singletonMap; import static org.apache.hadoop.ozone.OzoneConsts.GB; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -302,12 +303,20 @@ public void testThreadPoolPoolSize() { CloseContainerCommandHandler closeContainerCommandHandler = new CloseContainerCommandHandler(10, 10, ""); - closeContainerCommandHandler.handle(closeWithUnknownPipeline(), + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 1, PipelineID.randomId()), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 2, PipelineID.randomId()), + ozoneContainer, context, null); + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 3, PipelineID.randomId()), ozoneContainer, context, null); - closeContainerCommandHandler.handle(closeWithUnknownPipeline(), + closeContainerCommandHandler.handle(new CloseContainerCommand( + CONTAINER_ID + 4, PipelineID.randomId()), ozoneContainer, context, null); assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize()); - assertEquals(2, closeContainerCommandHandler.getThreadPoolActivePoolSize()); + assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java index b37e51825a20..af909693b2fa 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -67,7 +67,8 @@ public void setup() { } @Test - public void testExpiredCommandsAreNotProcessed() throws IOException { + public void testExpiredCommandsAreNotProcessed() + throws IOException, InterruptedException { DeleteContainerCommandHandler handler = createSubject(clock, 1000); DeleteContainerCommand command1 = new DeleteContainerCommand(1L); @@ -79,9 +80,11 @@ public void testExpiredCommandsAreNotProcessed() throws IOException { clock.fastForward(15000); handler.handle(command1, ozoneContainer, null, null); + Thread.sleep(1000); assertEquals(1, handler.getTimeoutCount()); handler.handle(command2, ozoneContainer, null, null); handler.handle(command3, ozoneContainer, null, null); + Thread.sleep(1000); assertEquals(1, handler.getTimeoutCount()); assertEquals(3, handler.getInvocationCount()); verify(controller, times(0)) From fa4dd4ddf3f7cf67881e380a0d4bc1a534cfc557 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Wed, 28 Aug 2024 22:02:31 +0800 Subject: [PATCH 11/12] Fix unit tests. --- .../TestDeleteContainerCommandHandler.java | 53 ++++++++++++++++--- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java index af909693b2fa..ff29cee75835 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; @@ -69,7 +72,13 @@ public void setup() { @Test public void testExpiredCommandsAreNotProcessed() throws IOException, InterruptedException { - DeleteContainerCommandHandler handler = createSubject(clock, 1000); + TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + CountDownLatch latch1 = new CountDownLatch(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( + threadFactory, latch1); + DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler( + clock, executor, 100); DeleteContainerCommand command1 = new DeleteContainerCommand(1L); command1.setDeadline(clock.millis() + 10000); @@ -80,11 +89,14 @@ public void testExpiredCommandsAreNotProcessed() clock.fastForward(15000); handler.handle(command1, ozoneContainer, null, null); - Thread.sleep(1000); + latch1.await(); assertEquals(1, handler.getTimeoutCount()); + CountDownLatch latch2 = new CountDownLatch(2); + executor.setLatch(latch2); handler.handle(command2, ozoneContainer, null, null); handler.handle(command3, ozoneContainer, null, null); - Thread.sleep(1000); + latch2.await(); + assertEquals(1, handler.getTimeoutCount()); assertEquals(3, handler.getInvocationCount()); verify(controller, times(0)) @@ -96,7 +108,8 @@ public void testExpiredCommandsAreNotProcessed() } @Test - public void testCommandForCurrentTermIsExecuted() throws IOException { + public void testCommandForCurrentTermIsExecuted() + throws IOException, InterruptedException { // GIVEN DeleteContainerCommand command = new DeleteContainerCommand(1L); command.setTerm(1); @@ -104,10 +117,17 @@ public void testCommandForCurrentTermIsExecuted() throws IOException { when(context.getTermOfLeaderSCM()) .thenReturn(OptionalLong.of(command.getTerm())); - DeleteContainerCommandHandler subject = createSubject(); + TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + CountDownLatch latch = new CountDownLatch(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); + ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( + threadFactory, latch); + DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler( + clock, executor, 100); // WHEN subject.handle(command, ozoneContainer, context, null); + latch.await(); // THEN verify(controller, times(1)) @@ -181,4 +201,21 @@ private static DeleteContainerCommandHandler createSubjectWithPoolSize( return new DeleteContainerCommandHandler(1, clock, queueSize, ""); } + static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor { + CountDownLatch latch; + ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch latch) { + super(1, 1, 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), threadFactory); + this.latch = latch; + } + + void setLatch(CountDownLatch latch) { + this.latch = latch; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + latch.countDown(); + } + } } From 06be32d6794a9d773609b2f5d728ba23141712b5 Mon Sep 17 00:00:00 2001 From: jianghuazhu <740087514@qq.com> Date: Wed, 28 Aug 2024 22:13:20 +0800 Subject: [PATCH 12/12] Fix checkstyle. --- .../TestDeleteContainerCommandHandler.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java index ff29cee75835..5ee31b97fd64 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java @@ -72,7 +72,6 @@ public void setup() { @Test public void testExpiredCommandsAreNotProcessed() throws IOException, InterruptedException { - TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); CountDownLatch latch1 = new CountDownLatch(1); ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( @@ -117,13 +116,13 @@ public void testCommandForCurrentTermIsExecuted() when(context.getTermOfLeaderSCM()) .thenReturn(OptionalLong.of(command.getTerm())); - TestClock clock = new TestClock(Instant.now(), ZoneId.systemDefault()); + TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault()); CountDownLatch latch = new CountDownLatch(1); ThreadFactory threadFactory = new ThreadFactoryBuilder().build(); ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor( threadFactory, latch); DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler( - clock, executor, 100); + testClock, executor, 100); // WHEN subject.handle(command, ozoneContainer, context, null); @@ -202,20 +201,20 @@ private static DeleteContainerCommandHandler createSubjectWithPoolSize( } static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor { - CountDownLatch latch; + private CountDownLatch countDownLatch; ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch latch) { super(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); - this.latch = latch; + this.countDownLatch = latch; } void setLatch(CountDownLatch latch) { - this.latch = latch; + this.countDownLatch = latch; } @Override protected void afterExecute(Runnable r, Throwable t) { - latch.countDown(); + countDownLatch.countDown(); } } }