Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
private long totalTime;

/**
* Constructs a ContainerReport handler.
* Constructs a close container command handler.
*/
public CloseContainerCommandHandler(
int threadPoolSize, int queueSize, String threadNamePrefix) {
Expand Down Expand Up @@ -220,4 +220,14 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -202,4 +203,14 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public final class CommandDispatcher {
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
connectionManager, StateContext context,
CommandHandler... handlers) {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(handlers);
Preconditions.checkArgument(handlers.length > 0);
Preconditions.checkNotNull(container);
Preconditions.checkNotNull(connectionManager);
this.context = context;
this.container = container;
this.connectionManager = connectionManager;
Expand All @@ -77,6 +72,7 @@ private CommandDispatcher(OzoneContainer container, SCMConnectionManager
commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
}

@VisibleForTesting
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
Expand All @@ -102,7 +98,7 @@ public void handle(SCMCommand command) {
LOG.error("Exception while handle command, ", ex);
}
} else {
LOG.error("Unknown SCM Command queued. There is no handler for this " +
LOG.warn("Unknown SCM Command queued. There is no handler for this " +
"command. Command: {}", command.getType().getDescriptorForType()
.getName());
}
Expand Down Expand Up @@ -201,11 +197,12 @@ public Builder setContext(StateContext stateContext) {
* @return Command Dispatcher.
*/
public CommandDispatcher build() {
Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
" manager.");
Preconditions.checkNotNull(this.container, "Missing container.");
Preconditions.checkNotNull(this.context, "Missing context.");
Preconditions.checkArgument(this.handlerList.size() > 0);
Preconditions.checkNotNull(this.connectionManager,
"Missing scm connection manager.");
Preconditions.checkNotNull(this.container, "Missing ozone container.");
Preconditions.checkNotNull(this.context, "Missing state context.");
Preconditions.checkArgument(this.handlerList.size() > 0,
"The number of command handlers must be greater than 0.");
return new CommandDispatcher(this.container, this.connectionManager,
this.context, handlerList.toArray(
new CommandHandler[handlerList.size()]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
Expand Down Expand Up @@ -182,4 +183,16 @@ public long getTotalRunTime() {
public int getQueuedCount() {
return queuedCount.get();
}

@SuppressWarnings("checkstyle:Indentation")
@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@SuppressWarnings("checkstyle:Indentation")
@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ public int getQueuedCount() {

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
return executor.getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
return executor.getActiveCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ public long getTotalRunTime() {
return totalTime.get();
}

@Override
public int getThreadPoolMaxPoolSize() {
return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
}

@Override
public int getThreadPoolActivePoolSize() {
return ((ThreadPoolExecutor)executor).getActiveCount();
}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ public class ReplicateContainerCommandHandler implements CommandHandler {
static final Logger LOG =
LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);

private int invocationCount;

private long totalTime;

private ConfigurationSource conf;

private ReplicationSupervisor supervisor;

private ContainerReplicator downloadReplicator;
Expand All @@ -60,7 +54,6 @@ public ReplicateContainerCommandHandler(
ReplicationSupervisor supervisor,
ContainerReplicator downloadReplicator,
ContainerReplicator pushReplicator) {
this.conf = conf;
this.supervisor = supervisor;
this.downloadReplicator = downloadReplicator;
this.pushReplicator = pushReplicator;
Expand Down Expand Up @@ -101,19 +94,20 @@ public SCMCommandProto.Type getCommandType() {

@Override
public int getInvocationCount() {
return this.invocationCount;
return (int) supervisor.getReplicationRequestCount();
}

@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
int currentCount = getInvocationCount();
if (currentCount > 0) {
return getTotalRunTime() / currentCount;
}
return 0;
}

@Override
public long getTotalRunTime() {
return totalTime;
return supervisor.getTotalTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,6 +77,7 @@ public final class ReplicationSupervisor {
private final AtomicLong failureCounter = new AtomicLong();
private final AtomicLong timeoutCounter = new AtomicLong();
private final AtomicLong skippedCounter = new AtomicLong();
private final AtomicLong totalTimeCounter = new AtomicLong();

/**
* A set of container IDs that are currently being downloaded
Expand Down Expand Up @@ -329,6 +331,7 @@ public TaskRunner(AbstractReplicationTask task) {

@Override
public void run() {
long startTime = Time.monotonicNow();
try {
requestCounter.incrementAndGet();

Expand Down Expand Up @@ -377,8 +380,10 @@ public void run() {
LOG.warn("Failed {}", this, e);
failureCounter.incrementAndGet();
} finally {
long endTime = Time.monotonicNow();
inFlight.remove(task);
decrementTaskCounter(task);
totalTimeCounter.addAndGet(endTime - startTime);
}
}

Expand Down Expand Up @@ -442,6 +447,10 @@ public long getReplicationSuccessCount() {
return successCounter.get();
}

public long getTotalTime() {
return totalTimeCounter.get();
}

public long getReplicationFailureCount() {
return failureCounter.get();
}
Expand Down