Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,19 +397,28 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String UPDATES_BLOCKED_DESC =
"Number of MS updates have been blocked so that the memstore can be flushed.";
String DELETE_KEY = "delete";
String DELETE_BYTES_KEY = "deleteBytes";
String CHECK_AND_DELETE_KEY = "checkAndDelete";
String CHECK_AND_PUT_KEY = "checkAndPut";
String DELETE_BATCH_KEY = "deleteBatch";
String GET_SIZE_KEY = "getSize";
String GET_KEY = "get";
String GET_BYTES_KEY = "getBytes";
String INCREMENT_KEY = "increment";
String INCREMENT_BYTES_KEY = "incrementBytes";
String PUT_KEY = "put";
String PUT_BYTES_KEY = "putBytes";
String PUT_BATCH_KEY = "putBatch";
String APPEND_KEY = "append";
String APPEND_BYTES_KEY = "appendBytes";
String REPLAY_KEY = "replay";
String SCAN_KEY = "scan";
String SCAN_SIZE_KEY = "scanSize";
String SCAN_BYTES_KEY = "scanBytes";
String SCAN_TIME_KEY = "scanTime";
String BULKLOAD_BYTES_KEY = "bulkLoadBytes";
String RECEIVED_BYTES_KEY = "receivedBytes";
String SENT_BYTES_KEY = "sentBytes";

String SLOW_PUT_KEY = "slowPutCount";
String SLOW_GET_KEY = "slowGetCount";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,51 @@ public interface MetricsRegionSource extends Comparable<MetricsRegionSource> {
*/
void updateAppend();

/**
* Update related bytes of all sent.
*/
void updateSentBytes(long size);

/**
* Update related bytes of all received.
*/
void updateReceivedBytes(long size);

/**
* Update related bytes of gets.
*/
void updateGetBytes(long size);

/**
* Update related bytes of puts.
*/
void updatePutBytes(long size);

/**
* Update related bytes of scans.
*/
void updateScanBytes(long size);

/**
* Update related bytes of deletes.
*/
void updateDeleteBytes(long size);

/**
* Update related bytes of increments.
*/
void updateIncrementBytes(long size);

/**
* Update related bytes of appends.
*/
void updateAppendBytes(long size);

/**
* Update related bytes of bulkloads.
*/
void updateBulkLoadBytes(long size);

/**
* Get the aggregate source to which this reports.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
private final String regionIncrementKey;
private final String regionAppendKey;
private final String regionScanKey;
private final String regionSentBytesKey;
private final String regionReceivedBytesKey;
private final String regionGetBytesKey;
private final String regionPutBytesKey;
private final String regionScanBytesKey;
private final String regionDeleteBytesKey;
private final String regionIncrementBytesKey;
private final String regionAppendBytesKey;
private final String regionBulkLoadBytesKey;


/*
* Implementation note: Do not put histograms per region. With hundreds of regions in a server
Expand All @@ -62,6 +72,15 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource {
private final MutableFastCounter regionAppend;
private final MutableFastCounter regionGet;
private final MutableFastCounter regionScan;
private final MutableFastCounter regionSentBytes;
private final MutableFastCounter regionReceivedBytes;
private final MutableFastCounter regionGetBytes;
private final MutableFastCounter regionPutBytes;
private final MutableFastCounter regionScanBytes;
private final MutableFastCounter regionDeleteBytes;
private final MutableFastCounter regionIncrementBytes;
private final MutableFastCounter regionAppendBytes;
private final MutableFastCounter regionBulkLoadBytes;

private final int hashCode;

Expand Down Expand Up @@ -101,6 +120,34 @@ public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper,

regionScanKey = regionNamePrefix + MetricsRegionServerSource.SCAN_KEY + suffix;
regionScan = registry.getCounter(regionScanKey, 0L);

regionSentBytesKey = regionNamePrefix + MetricsRegionServerSource.SENT_BYTES_KEY;
regionSentBytes = registry.getCounter(regionSentBytesKey, 0L);

regionReceivedBytesKey = regionNamePrefix + MetricsRegionServerSource.RECEIVED_BYTES_KEY;
regionReceivedBytes = registry.getCounter(regionReceivedBytesKey, 0L);

regionGetBytesKey = regionNamePrefix + MetricsRegionServerSource.GET_BYTES_KEY;
regionGetBytes = registry.getCounter(regionGetBytesKey, 0L);

regionPutBytesKey = regionNamePrefix + MetricsRegionServerSource.PUT_BYTES_KEY;
regionPutBytes = registry.getCounter(regionPutBytesKey, 0L);

regionScanBytesKey = regionNamePrefix + MetricsRegionServerSource.SCAN_BYTES_KEY;
regionScanBytes = registry.getCounter(regionScanBytesKey, 0L);

regionDeleteBytesKey = regionNamePrefix + MetricsRegionServerSource.DELETE_BYTES_KEY;
regionDeleteBytes = registry.getCounter(regionDeleteBytesKey, 0L);

regionIncrementBytesKey = regionNamePrefix + MetricsRegionServerSource.INCREMENT_BYTES_KEY;
regionIncrementBytes = registry.getCounter(regionIncrementBytesKey, 0L);

regionAppendBytesKey = regionNamePrefix + MetricsRegionServerSource.APPEND_BYTES_KEY;
regionAppendBytes = registry.getCounter(regionAppendBytesKey, 0L);

regionBulkLoadBytesKey = regionNamePrefix + MetricsRegionServerSource.BULKLOAD_BYTES_KEY;
regionBulkLoadBytes = registry.getCounter(regionBulkLoadBytesKey, 0L);

}

@Override
Expand Down Expand Up @@ -130,6 +177,16 @@ public void close() {
registry.removeMetric(regionGetKey);
registry.removeMetric(regionScanKey);

registry.removeMetric(regionSentBytesKey);
registry.removeMetric(regionReceivedBytesKey);
registry.removeMetric(regionGetBytesKey);
registry.removeMetric(regionPutBytesKey);
registry.removeMetric(regionScanBytesKey);
registry.removeMetric(regionDeleteBytesKey);
registry.removeMetric(regionIncrementBytesKey);
registry.removeMetric(regionAppendBytesKey);
registry.removeMetric(regionBulkLoadBytesKey);

regionWrapper = null;
}
}
Expand Down Expand Up @@ -164,6 +221,51 @@ public void updateAppend() {
regionAppend.incr();
}

@Override
public void updateSentBytes(long size) {
regionSentBytes.incr(size);
}

@Override
public void updateReceivedBytes(long size) {
regionReceivedBytes.incr(size);
}

@Override
public void updateGetBytes(long size) {
regionGetBytes.incr(size);
}

@Override
public void updatePutBytes(long size) {
regionPutBytes.incr(size);
}

@Override
public void updateScanBytes(long size) {
regionScanBytes.incr(size);
}

@Override
public void updateDeleteBytes(long size) {
regionDeleteBytes.incr(size);
}

@Override
public void updateIncrementBytes(long size) {
regionIncrementBytes.incr(size);
}

@Override
public void updateAppendBytes(long size) {
regionAppendBytes.incr(size);
}

@Override
public void updateBulkLoadBytes(long size) {
regionBulkLoadBytes.incr(size);
}

@Override
public MetricsRegionAggregateSource getAggregateSource() {
return agg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
Expand Down Expand Up @@ -3673,21 +3671,6 @@ public boolean visit(int index) throws IOException {
return true;
}
});

// FIXME: we may update metrics twice! here for all operations bypassed by CP and later in
// normal processing.
// Update metrics in same way as it is done when we go the normal processing route (we now
// update general metrics though a Coprocessor did the work).
if (region.metricsRegion != null) {
if (metrics[0] > 0) {
// There were some Puts in the batch.
region.metricsRegion.updatePut();
}
if (metrics[1] > 0) {
// There were some Deletes in the batch.
region.metricsRegion.updateDelete();
}
}
}

@Override
Expand All @@ -3699,9 +3682,11 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
if (mutation instanceof Put) {
HRegion.updateCellTimestamps(familyCellMaps[index].values(), byteTS);
miniBatchOp.incrementNumOfPuts();
miniBatchOp.incrementBytesForPuts(mutation.heapSize());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we traverse any synchronize blocks here or update any volatiles? In other words, wondering what the cost of these updates are.

We are also bulking up metrics w/ these added numbers... the metrics we send the master on each report (every second or so).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We spend a lot of cpu counting currently. This adds to the counting load.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saintstack Thanks for review
At present, we only have aggregate information and cannot know how much network resources are used in each region.

} else {
region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);
miniBatchOp.incrementNumOfDeletes();
miniBatchOp.incrementBytesForDeletes(mutation.heapSize());
}
region.rewriteCellTags(familyCellMaps[index], mutation);

Expand Down Expand Up @@ -3790,11 +3775,11 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> m
if (region.metricsRegion != null) {
if (miniBatchOp.getNumOfPuts() > 0) {
// There were some Puts in the batch.
region.metricsRegion.updatePut();
region.metricsRegion.updatePut(miniBatchOp.getBytesForPuts());
}
if (miniBatchOp.getNumOfDeletes() > 0) {
// There were some Deletes in the batch.
region.metricsRegion.updateDelete();
region.metricsRegion.updateDelete(miniBatchOp.getBytesForDeletes());
}
}
}
Expand Down Expand Up @@ -5682,6 +5667,10 @@ void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) th
try {
storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
store.bulkLoadHFile(storeFileInfo);
if (storeFileInfo != null && metricsRegion != null
&& storeFileInfo.getFileStatus() != null) {
metricsRegion.updateBulkLoad(storeFileInfo.getFileStatus().getLen());
}
} catch(FileNotFoundException ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ ((storeFileInfo != null) ? storeFileInfo.toString() :
Expand Down Expand Up @@ -6383,8 +6372,12 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
// Note the size of the store file
try {
FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
.getLen());
long size = fs.getFileStatus(commitedStoreFile).getLen();
storeFilesSizes.put(commitedStoreFile.getName(), size);
if (metricsRegion != null) {
metricsRegion.updateBulkLoad(size);
}

} catch (IOException e) {
LOG.warn("Failed to find the size of hfile " + commitedStoreFile, e);
storeFilesSizes.put(commitedStoreFile.getName(), 0L);
Expand Down Expand Up @@ -7608,7 +7601,13 @@ public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long no

void metricsUpdateForGet(List<Cell> results, long before) {
if (this.metricsRegion != null) {
this.metricsRegion.updateGet(EnvironmentEdgeManager.currentTime() - before);
long size = 0L;
if (results != null) {
for (Cell cell : results) {
size += cell.heapSize();
}
}
this.metricsRegion.updateGet(EnvironmentEdgeManager.currentTime() - before, size);
}
}

Expand Down Expand Up @@ -7983,10 +7982,10 @@ private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long no
if (this.metricsRegion != null) {
switch (op) {
case INCREMENT:
this.metricsRegion.updateIncrement();
this.metricsRegion.updateIncrement(memstoreAccounting.getDataSize());
break;
case APPEND:
this.metricsRegion.updateAppend();
this.metricsRegion.updateAppend(memstoreAccounting.getDataSize());
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,48 @@ public void close() {
source.close();
}

public void updatePut() {
public void updatePut(final long size) {
source.updatePut();
source.updatePutBytes(size);
source.updateReceivedBytes(size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between put and received? They are not the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Received contains all write operations such as increment/append/put/bulkload, putBytes consists of its own

}

public void updateDelete() {
public void updateDelete(final long size) {
source.updateDelete();
source.updateDeleteBytes(size);
source.updateReceivedBytes(size);
}

public void updateGet(final long t) {
public void updateGet(final long t, final long size) {
source.updateGet(t);
source.updateGetBytes(size);
source.updateSentBytes(size);
}

public void updateScanTime(final long t) {
public void updateScan(final long t, final long size) {
source.updateScanTime(t);
source.updateScanBytes(size);
source.updateSentBytes(size);
}

public void updateFilteredRecords(){
userAggregate.updateFilteredReadRequests();
}
public void updateAppend() {
public void updateAppend(final long size) {
source.updateAppend();
source.updateAppendBytes(size);
source.updateReceivedBytes(size);
}

public void updateIncrement() {
public void updateIncrement(final long size) {
source.updateIncrement();
source.updateIncrementBytes(size);
source.updateReceivedBytes(size);
}

public void updateBulkLoad(final long size) {
source.updateBulkLoadBytes(size);
source.updateReceivedBytes(size);
}

MetricsRegionSource getSource() {
Expand Down
Loading