Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,28 @@ public synchronized MutableGaugeFloat newGauge(MetricsInfo info, float iVal) {
*/
public synchronized MutableQuantiles newQuantiles(String name, String desc,
String sampleName, String valueName, int interval) {
return newQuantiles(name, desc, sampleName, valueName, interval, false);
}

/**
* Create a mutable metric that estimates quantiles of a stream of values
* @param name of the metric
* @param desc metric description
* @param sampleName of the metric (e.g., "Ops")
* @param valueName of the metric (e.g., "Time" or "Latency")
* @param interval rollover interval of estimator in seconds
* @param inverseQuantiles inverse the quantiles ( e.g. P99 will give the 1st quantile )
* @return a new quantile estimator object
* @throws MetricsException if interval is not a positive integer
*/
public synchronized MutableQuantiles newQuantiles(String name, String desc, String sampleName, String valueName,
int interval, boolean inverseQuantiles) {
checkMetricName(name);
if (interval <= 0) {
throw new MetricsException("Interval should be positive. Value passed" +
" is: " + interval);
throw new MetricsException("Interval should be positive. Value passed is: " + interval);
}
MutableQuantiles ret =
new MutableQuantiles(name, desc, sampleName, valueName, interval);
new MutableQuantiles(name, desc, sampleName, valueName, interval, inverseQuantiles);
metricsMap.put(name, ret);
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.util.InverseQuantiles;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.QuantileEstimator;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class MutableQuantiles extends MutableMetric {
* rollover interval (in seconds) of the estimator
*/
public MutableQuantiles(String name, String description, String sampleName,
String valueName, int interval) {
String valueName, int interval, boolean inverseQuantiles) {
String ucName = StringUtils.capitalize(name);
String usName = StringUtils.capitalize(sampleName);
String uvName = StringUtils.capitalize(valueName);
Expand All @@ -104,8 +105,7 @@ public MutableQuantiles(String name, String description, String sampleName,
String.format(descTemplate, percentile));
}

estimator = new SampleQuantiles(quantiles);

estimator = inverseQuantiles ? new InverseQuantiles(quantiles) : new SampleQuantiles(quantiles);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is estimator variable used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, estimator is the one which will sort the sample and populate quantiles.

this.interval = interval;
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.hadoop.metrics2.util;

import org.apache.hadoop.util.Preconditions;
import java.util.ListIterator;

public class InverseQuantiles extends SampleQuantiles{

public InverseQuantiles(Quantile[] quantiles) {
super(quantiles);
}


/**
* Get the estimated value at the inverse of the specified quantile.
* Eg: return the value at (1 - 0.99)*count position for quantile 0.99.
* When count is 100, quantile 0.99 is desired to return the value at the 1st position
*
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
* @return Estimated value at the inverse position of that quantile.
*/
long query(double quantile) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im assuming this logic looks similar to the SampleQuantile, is there any small refactor we could do to DRY, then maybe a couple lines below looks like
int desired = getQuantile(quantile)
with a default and overridden implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, that will be much cleaner. Let me make the change.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Minimal change here is to inverse list-order traversal.
  • The for loop is performing a Map operation.

The DRY method would be to encapsulate the for loop to a protected function "getQuantilesFromSamples".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to reverse the traversal order.
Overriding query() method instead of creating a new method and moved out the common validation step outside.

Preconditions.checkState(!samples.isEmpty(), "no data in estimator");

int rankMin = 0;
int desired = (int) ((1 - quantile) * count);

ListIterator<SampleItem> it = samples.listIterator();
SampleItem prev;
SampleItem cur = it.next();
for (int i = 1; i < samples.size(); i++) {
prev = cur;
cur = it.next();

rankMin += prev.g;

if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
return prev.value;
}
}

// edge case of wanting max value
return samples.get(0).value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public class SampleQuantiles implements QuantileEstimator {
/**
* Total number of items in stream
*/
private long count = 0;
long count = 0;

/**
* Current list of sampled items, maintained in sorted order with error bounds
*/
private LinkedList<SampleItem> samples;
LinkedList<SampleItem> samples;

/**
* Buffers incoming items to be inserted in batch. Items are inserted into
Expand Down Expand Up @@ -87,7 +87,7 @@ public SampleQuantiles(Quantile[] quantiles) {
* @param rank
* the index in the list of samples
*/
private double allowableError(int rank) {
double allowableError(int rank) {
int size = samples.size();
double minError = size + 1;
for (Quantile q : quantiles) {
Expand Down Expand Up @@ -203,7 +203,7 @@ private void compress() {
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
* @return Estimated value at that quantile.
*/
private long query(double quantile) {
long query(double quantile) {
Preconditions.checkState(!samples.isEmpty(), "no data in estimator");

int rankMin = 0;
Expand Down Expand Up @@ -291,7 +291,7 @@ synchronized public String toString() {
* Describes a measured value passed to the estimator, tracking additional
* metadata required by the CKMS algorithm.
*/
private static class SampleItem {
static class SampleItem {

/**
* Value of the sampled item (e.g. a measured latency value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `BytesRead` | Total number of bytes read from DataNode |
| `ReadTransferRateNumOps` | Total number of data read transfers |
| `ReadTransferRateAvgTime` | Average transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `ReadTransferRate`*num*`s(50/75/90/95/99)thPercentileRate` | The 50/75/90/95/99th inverse percentile of the transfer rate of bytes read from DataNode, measured in bytes per second. |
| `BlocksWritten` | Total number of blocks written to DataNode |
| `BlocksRead` | Total number of blocks read from DataNode |
| `BlocksReplicated` | Total number of blocks replicated |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,36 @@ public void testQuantileError() throws IOException {
}
}
}

@Test
public void testInverseQuantiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you just add a description of what the test is doing

SampleQuantiles estimatorWithInverseQuantiles = new InverseQuantiles(quantiles);
final int count = 100000;
Random r = new Random(0xDEADDEAD);
Long[] values = new Long[count];
for (int i = 0; i < count; i++) {
values[i] = (long) (i + 1);
}
// Do 10 shuffle/insert/check cycles
for (int i = 0; i < 10; i++) {
System.out.println("Starting run " + i);
Collections.shuffle(Arrays.asList(values), r);
estimatorWithInverseQuantiles.clear();
for (int j = 0; j < count; j++) {
estimatorWithInverseQuantiles.insert(values[j]);
}
Map<Quantile, Long> snapshot;
snapshot = estimatorWithInverseQuantiles.snapshot();
for (Quantile q : quantiles) {
long actual = (long) ((1 - q.quantile) * count);
long error = (long) ((0.1 - q.error) * count);
long estimate = snapshot.get(q);
System.out
.println(String.format("For quantile %f Expected %d with error %d, estimated %d",
q.quantile, actual, error, estimate));
assertThat(estimate <= actual + error).isTrue();
assertThat(estimate >= actual - error).isTrue();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
"ops", "latency", interval);
readTransferRateQuantiles[i] = registry.newQuantiles(
"readTransferRate" + interval + "s",
"Rate at which bytes are read from datanode calculated in bytes per second",
"ops", "rate", interval);
"Rate at which bytes are read from datanode calculated in bytes per second with inverse quantiles",
"ops", "rate", interval, true);
}
}

Expand Down