Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ public void businessFailedRequests(String interfaceName, String methodName, Stri
});
}

public void timeoutRequests(String interfaceName, String methodName, String group, String version) {
doExecute(RequestEvent.Type.REQUEST_TIMEOUT,statHandler->{
statHandler.increase(interfaceName, methodName, group, version);
});
}

public void limitRequests(String interfaceName, String methodName, String group, String version) {
doExecute(RequestEvent.Type.REQUEST_LIMIT,statHandler->{
statHandler.increase(interfaceName, methodName, group, version);
});
}

public void increaseProcessingRequests(String interfaceName, String methodName, String group, String version) {
doExecute(RequestEvent.Type.PROCESSING,statHandler-> {
statHandler.increase(interfaceName, methodName, group, version);
Expand Down Expand Up @@ -137,6 +149,14 @@ private void collectRequests(List<MetricSample> list) {

doExecute(RequestEvent.Type.BUSINESS_FAILED, MetricsStatHandler::get).filter(e->!e.isEmpty())
.ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUEST_BUSINESS_FAILED, k.getTags(), REQUESTS, v::get))));


doExecute(RequestEvent.Type.REQUEST_TIMEOUT, MetricsStatHandler::get).filter(e->!e.isEmpty())
.ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TIMEOUT_AGG, k.getTags(), REQUESTS, v::get))));

doExecute(RequestEvent.Type.REQUEST_LIMIT, MetricsStatHandler::get).filter(e->!e.isEmpty())
.ifPresent(map-> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_LIMIT_AGG, k.getTags(), REQUESTS, v::get))));

}

private void collectRT(List<MetricSample> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ public void doNotify(MethodMetric metric) {
});

stats.put(RequestEvent.Type.PROCESSING, new DefaultMetricsStatHandler(applicationName));

stats.put(RequestEvent.Type.REQUEST_LIMIT, new DefaultMetricsStatHandler(applicationName) {
@Override
public void doNotify(MethodMetric metric) {
publishEvent(new RequestEvent(metric, RequestEvent.Type.REQUEST_LIMIT));
}
});

stats.put(RequestEvent.Type.REQUEST_TIMEOUT, new DefaultMetricsStatHandler(applicationName) {
@Override
public void doNotify(MethodMetric metric) {
publishEvent(new RequestEvent(metric, RequestEvent.Type.REQUEST_TIMEOUT));
}
});
}

private void publishEvent(MetricsEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum Type {
SUCCEED,
FAILED,
BUSINESS_FAILED,

REQUEST_TIMEOUT,
REQUEST_LIMIT,
PROCESSING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public enum MetricsKey {
METRIC_REQUESTS_SUCCEED_AGG("requests.succeed.aggregate", "Aggregated Succeed Requests"),
METRIC_REQUESTS_FAILED_AGG("requests.failed.aggregate", "Aggregated Failed Requests"),
METRIC_REQUESTS_BUSINESS_FAILED_AGG("requests.business.failed.aggregate", "Aggregated Business Failed Requests"),
METRIC_REQUESTS_TIMEOUT_AGG("requests.timeout.failed.aggregate", "Aggregated timeout Failed Requests"),
METRIC_REQUESTS_LIMIT_AGG("requests.limit.aggregate", "Aggregated limit Requests"),

METRIC_QPS("qps", "Query Per Seconds"),
METRIC_RT_LAST("rt.last", "Last Response Time"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class AggregateMetricsCollector implements MetricsCollector, MetricsListe
private final Map<MethodMetric, TimeWindowCounter> succeedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> failedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> businessFailedRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> timeoutRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> limitRequests = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowCounter> qps = new ConcurrentHashMap<>();
private final Map<MethodMetric, TimeWindowQuantile> rt = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -118,6 +120,14 @@ private void onRequestEvent(RequestEvent event) {
counter = businessFailedRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
break;

case REQUEST_TIMEOUT:
counter = timeoutRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
break;

case REQUEST_LIMIT:
counter = limitRequests.computeIfAbsent(metric, k -> new TimeWindowCounter(bucketNum, timeWindowSeconds));
break;

default:
break;
}
Expand All @@ -142,6 +152,8 @@ private void collectRequests(List<MetricSample> list) {
succeedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_SUCCEED_AGG, k.getTags(), REQUESTS, v::get)));
failedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
businessFailedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_BUSINESS_FAILED_AGG, k.getTags(), REQUESTS, v::get)));
timeoutRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_TIMEOUT_AGG, k.getTags(), REQUESTS, v::get)));
limitRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.METRIC_REQUESTS_LIMIT_AGG, k.getTags(), REQUESTS, v::get)));
}

private void collectQPS(List<MetricSample> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,22 @@ public void postExecute(Result result) {
public void throwExecute(Throwable throwable){
if (throwable instanceof RpcException) {
RpcException rpcException = (RpcException)throwable;
if (rpcException.isBiz()) {
collector.businessFailedRequests(interfaceName, methodName, group, version);
}else{
collector.increaseFailedRequests(interfaceName, methodName, group, version);
switch (rpcException.getCode()) {

case RpcException.TIMEOUT_EXCEPTION:
collector.timeoutRequests(interfaceName, methodName, group, version);
break;

case RpcException.LIMIT_EXCEEDED_EXCEPTION:
collector.limitRequests(interfaceName, methodName, group, version);
break;

case RpcException.BIZ_EXCEPTION:
collector.businessFailedRequests(interfaceName, methodName, group, version);
break;

default:
collector.increaseFailedRequests(interfaceName, methodName, group, version);
}
}
endExecute(()-> throwable instanceof RpcException && ((RpcException) throwable).isBiz());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,23 @@

package org.apache.dubbo.metrics.filter;

import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
import static org.apache.dubbo.common.constants.CommonConstants.GENERIC_PARAMETER_DESC;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_GROUP_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_INTERFACE_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_METHOD_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_VERSION_KEY;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.dubbo.common.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.common.metrics.model.MetricsKey;
import org.apache.dubbo.common.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.common.metrics.model.sample.MetricSample;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.AppResponse;
Expand All @@ -27,25 +42,11 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.*;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_GROUP_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_INTERFACE_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_METHOD_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_VERSION_KEY;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

class MetricsFilterTest {

private ApplicationModel applicationModel;
Expand Down Expand Up @@ -143,6 +144,57 @@ void testBusinessFailedRequests() {
Assertions.assertEquals(tags.get(TAG_VERSION_KEY), VERSION);
}


@Test
void testTimeoutRequests() {
collector.setCollectEnabled(true);

given(invoker.invoke(invocation)).willThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION));
initParam();

Long count = 2L;

for (int i = 0; i < count; i++) {
try {
filter.invoke(invoker, invocation);
} catch (Exception e) {
Assertions.assertTrue(e instanceof RpcException);
filter.onError(e, invoker, invocation);
}
}
Map<String, MetricSample> metricsMap = getMetricsMap();
Assertions.assertTrue(metricsMap.containsKey(MetricsKey.METRIC_REQUESTS_TIMEOUT_AGG.getName()));

MetricSample sample = metricsMap.get(MetricsKey.METRIC_REQUESTS_TIMEOUT_AGG.getName());

Assertions.assertSame(((GaugeMetricSample) sample).getSupplier().get().longValue(), count);
}

@Test
void testLimitRequests() {
collector.setCollectEnabled(true);

given(invoker.invoke(invocation)).willThrow(new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION));
initParam();

Long count = 3L;

for (int i = 0; i < count; i++) {
try {
filter.invoke(invoker, invocation);
} catch (Exception e) {
Assertions.assertTrue(e instanceof RpcException);
filter.onError(e, invoker, invocation);
}
}
Map<String, MetricSample> metricsMap = getMetricsMap();
Assertions.assertTrue(metricsMap.containsKey(MetricsKey.METRIC_REQUESTS_LIMIT_AGG.getName()));

MetricSample sample = metricsMap.get(MetricsKey.METRIC_REQUESTS_LIMIT_AGG.getName());

Assertions.assertSame(((GaugeMetricSample) sample).getSupplier().get().longValue(), count);
}

@Test
void testSucceedRequests() {
collector.setCollectEnabled(true);
Expand Down