Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
* A mutable long counter
Expand All @@ -32,11 +32,11 @@
@InterfaceStability.Evolving
public class MutableCounterLong extends MutableCounter {

private AtomicLong value = new AtomicLong();
private final LongAdder value = new LongAdder();

public MutableCounterLong(MetricsInfo info, long initValue) {
super(info);
this.value.set(initValue);
this.value.add(initValue);
}

@Override
Expand All @@ -49,12 +49,12 @@ public void incr() {
* @param delta of the increment
*/
public void incr(long delta) {
value.addAndGet(delta);
value.add(delta);
setChanged();
}

public long value() {
return value.get();
return value.longValue();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.hadoop.classification.InterfaceAudience;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
* The client-side metrics for hedged read feature.
Expand All @@ -28,20 +28,20 @@
*/
@InterfaceAudience.Private
public class DFSHedgedReadMetrics {
public final AtomicLong hedgedReadOps = new AtomicLong();
public final AtomicLong hedgedReadOpsWin = new AtomicLong();
public final AtomicLong hedgedReadOpsInCurThread = new AtomicLong();
public final LongAdder hedgedReadOps = new LongAdder();
public final LongAdder hedgedReadOpsWin = new LongAdder();
public final LongAdder hedgedReadOpsInCurThread = new LongAdder();

public void incHedgedReadOps() {
hedgedReadOps.incrementAndGet();
hedgedReadOps.increment();
}

public void incHedgedReadOpsInCurThread() {
hedgedReadOpsInCurThread.incrementAndGet();
hedgedReadOpsInCurThread.increment();
}

public void incHedgedReadWins() {
hedgedReadOpsWin.incrementAndGet();
hedgedReadOpsWin.increment();
}

public long getHedgedReadOps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
* This storage statistics tracks how many times each DFS operation was issued.
Expand Down Expand Up @@ -141,21 +141,21 @@ public static OpType fromSymbol(String symbol) {

public static final String NAME = "DFSOpsCountStatistics";

private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class);
private final Map<OpType, LongAdder> opsCount = new EnumMap<>(OpType.class);

public DFSOpsCountStatistics() {
super(NAME);
for (OpType opType : OpType.values()) {
opsCount.put(opType, new AtomicLong(0));
opsCount.put(opType, new LongAdder());
}
}

public void incrementOpCounter(OpType op) {
opsCount.get(op).addAndGet(1);
opsCount.get(op).increment();
}

private class LongIterator implements Iterator<LongStatistic> {
private Iterator<Entry<OpType, AtomicLong>> iterator =
private final Iterator<Entry<OpType, LongAdder>> iterator =
opsCount.entrySet().iterator();

@Override
Expand All @@ -168,9 +168,9 @@ public LongStatistic next() {
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
final Entry<OpType, AtomicLong> entry = iterator.next();
final Entry<OpType, LongAdder> entry = iterator.next();
return new LongStatistic(entry.getKey().getSymbol(),
entry.getValue().get());
entry.getValue().longValue());
}

@Override
Expand All @@ -192,7 +192,7 @@ public Iterator<LongStatistic> getLongStatistics() {
@Override
public Long getLong(String key) {
final OpType type = OpType.fromSymbol(key);
return type == null ? null : opsCount.get(type).get();
return type == null ? null : opsCount.get(type).longValue();
}

@Override
Expand All @@ -202,8 +202,8 @@ public boolean isTracked(String key) {

@Override
public void reset() {
for (AtomicLong count : opsCount.values()) {
count.set(0);
for (LongAdder count : opsCount.values()) {
count.reset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
Expand Down Expand Up @@ -143,11 +144,11 @@ public boolean shouldAdvertise() {
/**
* Number of cache commands that could not be completed successfully
*/
final AtomicLong numBlocksFailedToCache = new AtomicLong(0);
final LongAdder numBlocksFailedToCache = new LongAdder();
/**
* Number of uncache commands that could not be completed successfully
*/
final AtomicLong numBlocksFailedToUncache = new AtomicLong(0);
final LongAdder numBlocksFailedToUncache = new LongAdder();

public FsDatasetCache(FsDatasetImpl dataset) throws IOException {
this.dataset = dataset;
Expand Down Expand Up @@ -278,7 +279,7 @@ synchronized void cacheBlock(long blockId, String bpid,
LOG.debug("Block with id {}, pool {} already exists in the "
+ "FsDatasetCache with state {}", blockId, bpid, prevValue.state
);
numBlocksFailedToCache.incrementAndGet();
numBlocksFailedToCache.increment();
return;
}
mappableBlockMap.put(key, new Value(null, State.CACHING));
Expand All @@ -301,7 +302,7 @@ synchronized void uncacheBlock(String bpid, long blockId) {
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+ "because it is not currently in the mappableBlockMap.", blockId,
bpid);
numBlocksFailedToUncache.incrementAndGet();
numBlocksFailedToUncache.increment();
return;
}
switch (prevValue.state) {
Expand Down Expand Up @@ -331,7 +332,7 @@ synchronized void uncacheBlock(String bpid, long blockId) {
default:
LOG.debug("Block with id {}, pool {} does not need to be uncached, "
+ "because it is in state {}.", blockId, bpid, prevValue.state);
numBlocksFailedToUncache.incrementAndGet();
numBlocksFailedToUncache.increment();
break;
}
}
Expand Down Expand Up @@ -482,7 +483,7 @@ public void run() {
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, cacheLoader.getCacheUsed());
IOUtils.closeQuietly(mappableBlock);
numBlocksFailedToCache.incrementAndGet();
numBlocksFailedToCache.increment();

synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
Expand Down Expand Up @@ -607,11 +608,11 @@ public long getCacheCapacity() {
}

public long getNumBlocksFailedToCache() {
return numBlocksFailedToCache.get();
return numBlocksFailedToCache.longValue();
}

public long getNumBlocksFailedToUncache() {
return numBlocksFailedToUncache.get();
return numBlocksFailedToUncache.longValue();
}

public long getNumBlocksCached() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2463,7 +2463,7 @@ private void cacheBlock(String bpid, long blockId) {
success = true;
} finally {
if (!success) {
cacheManager.numBlocksFailedToCache.incrementAndGet();
cacheManager.numBlocksFailedToCache.increment();
}
}
blockFileName = info.getBlockURI().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -183,7 +183,7 @@ private enum State {

// these are statistics counters.
private long numTransactions; // number of transactions
private final AtomicLong numTransactionsBatchedInSync = new AtomicLong();
private final LongAdder numTransactionsBatchedInSync = new LongAdder();
private long totalTimeTransactions; // total time for all transactions
private NameNodeMetrics metrics;

Expand Down Expand Up @@ -731,7 +731,7 @@ protected void logSync(long mytxid) {
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
numTransactionsBatchedInSync.add(editsBatchedInSync);
}

} finally {
Expand Down Expand Up @@ -771,7 +771,7 @@ private void printStatistics(boolean force) {
.append(" Total time for transactions(ms): ")
.append(totalTimeTransactions)
.append(" Number of transactions batched in Syncs: ")
.append(numTransactionsBatchedInSync.get())
.append(numTransactionsBatchedInSync.longValue())
.append(" Number of syncs: ")
.append(editLogStream.getNumSync())
.append(" SyncTimes(ms): ")
Expand Down Expand Up @@ -1404,7 +1404,7 @@ private void startLogSegment(final long segmentTxId, int layoutVersion)

numTransactions = 0;
totalTimeTransactions = 0;
numTransactionsBatchedInSync.set(0L);
numTransactionsBatchedInSync.reset();

// TODO no need to link this back to storage anymore!
// See HDFS-2174.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
Expand Down Expand Up @@ -113,12 +114,12 @@ public Long initialValue() {
* The number of time the read lock
* has been held longer than the threshold.
*/
private final AtomicLong numReadLockLongHold = new AtomicLong(0);
private final LongAdder numReadLockLongHold = new LongAdder();
/**
* The number of time the write lock
* has been held for longer than the threshold.
*/
private final AtomicLong numWriteLockLongHold = new AtomicLong(0);
private final LongAdder numWriteLockLongHold = new LongAdder();

@VisibleForTesting
static final String OP_NAME_OTHER = "OTHER";
Expand Down Expand Up @@ -192,7 +193,7 @@ public void readUnlock(String opName,
final long readLockIntervalMs =
TimeUnit.NANOSECONDS.toMillis(readLockIntervalNanos);
if (needReport && readLockIntervalMs >= this.readLockReportingThresholdMs) {
numReadLockLongHold.incrementAndGet();
numReadLockLongHold.increment();
String lockReportInfo = null;
boolean done = false;
while (!done) {
Expand Down Expand Up @@ -309,7 +310,7 @@ private void writeUnlock(String opName, boolean suppressWriteLockReport,
LogAction logAction = LogThrottlingHelper.DO_NOT_LOG;
if (needReport &&
writeLockIntervalMs >= this.writeLockReportingThresholdMs) {
numWriteLockLongHold.incrementAndGet();
numWriteLockLongHold.increment();
if (longestWriteLockHeldInfo.getIntervalMs() <= writeLockIntervalMs) {
String lockReportInfo = lockReportInfoSupplier != null ? " (" +
lockReportInfoSupplier.get() + ")" : "";
Expand Down Expand Up @@ -382,7 +383,7 @@ public int getQueueLength() {
* has been held longer than the threshold
*/
public long getNumOfReadLockLongHold() {
return numReadLockLongHold.get();
return numReadLockLongHold.longValue();
}

/**
Expand All @@ -393,7 +394,7 @@ public long getNumOfReadLockLongHold() {
* has been held longer than the threshold.
*/
public long getNumOfWriteLockLongHold() {
return numWriteLockLongHold.get();
return numWriteLockLongHold.longValue();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
DFSClient dfsClient = fileSys.getClient();
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
// Metrics instance is static, so we need to reset counts from prior tests.
metrics.hedgedReadOps.set(0);
metrics.hedgedReadOpsWin.set(0);
metrics.hedgedReadOpsInCurThread.set(0);
metrics.hedgedReadOps.reset();
metrics.hedgedReadOpsWin.reset();
metrics.hedgedReadOpsInCurThread.reset();

try {
Path file1 = new Path("hedgedReadMaxOut.dat");
Expand Down Expand Up @@ -590,7 +590,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
String filename = "/hedgedReadMaxOut.dat";
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
// Metrics instance is static, so we need to reset counts from prior tests.
metrics.hedgedReadOps.set(0);
metrics.hedgedReadOps.reset();
try {
Path file = new Path(filename);
output = fileSys.create(file, (short) 2);
Expand Down