Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
43b6d76
init
wxbty Jun 19, 2023
733553a
fix
wxbty Jun 19, 2023
33f45fe
fix
wxbty Jun 20, 2023
0a3df24
fix
wxbty Jun 21, 2023
9a61a63
fix
wxbty Jun 21, 2023
90245ac
add licence
wxbty Jun 21, 2023
8146354
Merge branch '3.2' into registry_multi
wxbty Jun 21, 2023
1d7345c
fix conflicx
wxbty Jun 21, 2023
d831b91
Merge remote-tracking branch 'upstream/3.2' into registry_multi
wxbty Jun 22, 2023
ee1f491
fix conflicx
wxbty Jun 22, 2023
67c1645
fix testcase
wxbty Jun 22, 2023
e294ac2
fix testcase
wxbty Jun 23, 2023
72f69d2
Merge branch '3.2' into registry_multi
songxiaosheng Jun 25, 2023
babbcdc
Merge branch '3.2' into registry_multi
songxiaosheng Jun 26, 2023
d9b3f8b
Merge branch '3.2' into registry_multi
songxiaosheng Jun 27, 2023
2139a22
Merge branch '3.2' into registry_multi
songxiaosheng Jun 28, 2023
73c1dc9
lowcase
wxbty Jun 28, 2023
4de44a0
Merge branch 'registry_multi' of https://github.com/wxbty/dubbo into …
wxbty Jun 28, 2023
60a381d
Merge branch '3.2' into registry_multi
wxbty Jun 28, 2023
81ba6c2
Merge branch '3.2' into registry_multi
wxbty Jun 29, 2023
724d77b
fix testcase
wxbty Jun 30, 2023
deb49be
Merge branch '3.2' into registry_multi
wxbty Jul 3, 2023
4f50518
fix conflicx
wxbty Jul 5, 2023
4804416
remove unuse
wxbty Jul 5, 2023
dd271dd
opt notify&directory
wxbty Jul 5, 2023
966ae18
fix post&finish
wxbty Jul 6, 2023
7b03756
fix conflicx
wxbty Jul 14, 2023
ed74054
revert name
wxbty Jul 14, 2023
a7d28d1
move registerMetadataAndInstance
wxbty Jul 16, 2023
4b0a486
move metrics from
wxbty Jul 16, 2023
e80cdf5
remove some unuse
wxbty Jul 16, 2023
ffee42c
remove unuse
wxbty Jul 16, 2023
6513dd7
fix style
wxbty Jul 16, 2023
54f7389
fix style
wxbty Jul 16, 2023
04fc6da
remove unuse
wxbty Jul 16, 2023
a4c5b0d
Merge branch '3.2' into registry_multi
AlbumenJ Jul 16, 2023
cce7717
remove unuse
wxbty Jul 17, 2023
8944a66
Merge remote-tracking branch 'upstream/3.2' into registry_multi
wxbty Jul 27, 2023
5aad01c
fix default&& move post
wxbty Jul 27, 2023
585966b
add multi registry lables for subscribe
wxbty Jul 27, 2023
13a4376
fix
wxbty Jul 27, 2023
45e6bac
fix url
wxbty Jul 27, 2023
2f551b9
fix url
wxbty Jul 27, 2023
49bee1a
Fix uts
AlbumenJ Jul 29, 2023
76d64d5
add registry key for directory
wxbty Jul 29, 2023
bdfcede
Merge branch '3.2' into registry_multi
wxbty Jul 29, 2023
bd74e10
Compatible with empty registry keys
wxbty Jul 30, 2023
cd5c026
Merge branch 'registry_multi' of https://github.com/wxbty/dubbo into …
wxbty Jul 30, 2023
24bb497
Merge branch '3.2' into registry_multi
songxiaosheng Jul 30, 2023
734287a
remove unus
wxbty Jul 30, 2023
f2af060
Merge branch 'registry_multi' of https://github.com/wxbty/dubbo into …
wxbty Jul 30, 2023
647a6a1
Support registry type
AlbumenJ Jul 30, 2023
e2ac484
Fix attachment
AlbumenJ Jul 30, 2023
ec488fc
Code enhancement
AlbumenJ Jul 30, 2023
631452a
Fix registry key
AlbumenJ Jul 30, 2023
0d9630b
Fix service discovery notify
AlbumenJ Jul 31, 2023
f966970
fix
wxbty Jul 31, 2023
a7c739d
Merge branch '3.2' into registry_multi
songxiaosheng Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.RegisterTypeEnum;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
Expand Down Expand Up @@ -513,8 +515,8 @@ private void doExportUrls(RegisterTypeEnum registerType) {
repository.registerProvider(providerModel);

List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

MetricsEventBus.post(RegistryEvent.toRsEvent(module.getApplicationModel(), getUniqueServiceName(), protocols.size() * registryURLs.size()),
List<String> ServiceDiscoveryNames = registryURLs.stream().map(url-> url.getParameter(RegistryConstants.REGISTRY_CLUSTER_KEY)).distinct().collect(Collectors.toList());
MetricsEventBus.post(RegistryEvent.toRsEvent(module.getApplicationModel(), getUniqueServiceName(), protocols.size(), ServiceDiscoveryNames),
() -> {
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.dubbo.metrics.service.MetricsServiceExporter;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.support.RegistryManager;
import org.apache.dubbo.rpc.model.ApplicationModel;
Expand Down Expand Up @@ -899,12 +900,16 @@ private DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
private void registerServiceInstance() {
try {
registered = true;
MetricsEventBus.post(RegistryEvent.toRegisterEvent(applicationModel),
() -> {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
return null;
}
);
List<ServiceDiscovery> serviceDiscoveries = ServiceInstanceMetadataUtils.getServiceDiscoveries(applicationModel);
if (serviceDiscoveries.size() > 0) {
MetricsEventBus.post(RegistryEvent.toRegisterEvent(applicationModel, ServiceInstanceMetadataUtils.getServiceDiscoveryNames(serviceDiscoveries)),
() -> {
// register service instance
serviceDiscoveries.forEach(ServiceDiscovery::register);
return null;
}
);
}
} catch (Exception e) {
logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

public interface MetricsConstants {

String INVOCATION = "metric_filter_invocation";
String INVOCATION_METRICS_COUNTER = "metric_filter_invocation_counter";

String INVOCATION_SIDE = "metric_filter_side";
String INVOCATION = "metricFilterInvocation";
String INVOCATION_METRICS_COUNTER = "metricFilterInvocationCounter";
String INVOCATION_SIDE = "metricFilterSide";

String ATTACHMENT_KEY_SERVICE = "serviceKey";
String ATTACHMENT_KEY_SIZE = "size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public abstract class CombMetricsCollector<E extends TimeCounterEvent> extends AbstractMetricsListener<E> implements ApplicationMetricsCollector<E>, ServiceMetricsCollector<E>, MethodMetricsCollector<E> {

private final BaseStatComposite stats;
protected final BaseStatComposite stats;
private MetricsEventMulticaster eventMulticaster;


Expand Down Expand Up @@ -101,5 +101,9 @@ public void onEventFinish(TimeCounterEvent event) {
public void onEventError(TimeCounterEvent event) {
eventMulticaster.publishErrorEvent(event);
}

protected BaseStatComposite getStats() {
return stats;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public List<MetricSample> export(MetricsCategory category) {

@SuppressWarnings({"rawtypes"})
private GaugeMetricSample convertToSample(MetricsKey type, MetricsCategory category, AtomicLong targetNumber) {
return new GaugeMetricSample<>(type, MetricsSupport.applicationTags(getApplicationModel()), category, targetNumber, AtomicLong::get);
return new GaugeMetricSample<>(type, MetricsSupport.applicationTags(getApplicationModel(), null), category, targetNumber, AtomicLong::get);
}

public Map<MetricsKey, AtomicLong> getApplicationNumStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.dubbo.metrics.report.MetricsExport;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


/**
Expand Down Expand Up @@ -69,15 +71,15 @@ protected void init(RtStatComposite rtStatComposite) {
}

public void calcApplicationRt(String registryOpType, Long responseTime) {
rtStatComposite.calcApplicationRt(registryOpType, responseTime);
rtStatComposite.calcKeyRt(registryOpType, responseTime, rtStatComposite.getAppName());
}

public void calcServiceKeyRt(String serviceKey, String registryOpType, Long responseTime) {
rtStatComposite.calcServiceKeyRt(serviceKey, registryOpType, responseTime);
rtStatComposite.calcKeyRt(registryOpType, responseTime, serviceKey);
}

public void calcMethodKeyRt(Invocation invocation, String registryOpType, Long responseTime) {
rtStatComposite.calcMethodKeyRt(invocation, registryOpType, responseTime);
rtStatComposite.calcKeyRt(registryOpType, responseTime, invocation.getTargetServiceUniqueName() + "_" + RpcUtils.getMethodName(invocation));
}

public void setServiceKey(MetricsKeyWrapper metricsKey, String serviceKey, int num) {
Expand All @@ -92,6 +94,10 @@ public void incrementServiceKey(MetricsKeyWrapper metricsKeyWrapper, String attS
serviceStatComposite.incrementServiceKey(metricsKeyWrapper, attServiceKey, size);
}

public void incrementServiceKey(MetricsKeyWrapper metricsKeyWrapper, String attServiceKey, Map<String, String> extra, int size) {
serviceStatComposite.incrementExtraServiceKey(metricsKeyWrapper, attServiceKey, extra, size);
}

public void incrementMethodKey(MetricsKeyWrapper metricsKeyWrapper, Invocation invocation, int size) {
methodStatComposite.incrementMethodKey(metricsKeyWrapper, invocation, size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.metrics.report.AbstractMetricsExport;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,23 +77,9 @@ private List<LongContainer<? extends Number>> initStats(MetricsPlaceValue placeV
return singleRtStats;
}

public void calcApplicationRt(String registryOpType, Long responseTime) {
public void calcKeyRt(String registryOpType, Long responseTime, String rtKey) {
for (LongContainer container : rtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, getAppName(), container.getInitFunc());
container.getConsumerFunc().accept(responseTime, current);
}
}

public void calcServiceKeyRt(String serviceKey, String registryOpType, Long responseTime) {
for (LongContainer container : rtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, serviceKey, container.getInitFunc());
container.getConsumerFunc().accept(responseTime, current);
}
}

public void calcMethodKeyRt(Invocation invocation, String registryOpType, Long responseTime) {
for (LongContainer container : rtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, invocation.getTargetServiceUniqueName() + "_" + RpcUtils.getMethodName(invocation), container.getInitFunc());
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, rtKey, container.getInitFunc());
container.getConsumerFunc().accept(responseTime, current);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,18 @@ public void initWrapper(List<MetricsKeyWrapper> metricsKeyWrappers) {
}

public void incrementServiceKey(MetricsKeyWrapper wrapper, String serviceKey, int size) {
incrementExtraServiceKey(wrapper, serviceKey, null, size);
}

public void incrementExtraServiceKey(MetricsKeyWrapper wrapper, String serviceKey, Map<String,String> extra, int size) {
if (!serviceWrapperNumStats.containsKey(wrapper)) {
return;
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(new ServiceKeyMetric(getApplicationModel(), serviceKey), k -> new AtomicLong(0L)).getAndAdd(size);
ServiceKeyMetric serviceKeyMetric = new ServiceKeyMetric(getApplicationModel(), serviceKey);
if (extra != null) {
serviceKeyMetric.setExtraInfo(extra);
}
serviceWrapperNumStats.get(wrapper).computeIfAbsent(serviceKeyMetric, k -> new AtomicLong(0L)).getAndAdd(size);
MetricsSupport.fillZero(serviceWrapperNumStats);
}

Expand All @@ -69,6 +77,7 @@ public void setServiceKey(MetricsKeyWrapper wrapper, String serviceKey, int num)
MetricsSupport.fillZero(serviceWrapperNumStats);
}

@Override
public List<MetricSample> export(MetricsCategory category) {
List<MetricSample> list = new ArrayList<>();
for (MetricsKeyWrapper wrapper : serviceWrapperNumStats.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
/**
* According to the event template of {@link MetricsEventBus},
* build a consistent static method for general and custom monitoring consume methods
*
*/
public abstract class AbstractMetricsKeyListener extends AbstractMetricsListener<TimeCounterEvent> implements MetricsLifeListener<TimeCounterEvent> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractMetricsListener<E extends MetricsEvent> implements
private final Map<Class<?>, Boolean> eventMatchCache = new ConcurrentHashMap<>();

/**
* Whether to support the general determination of event points depends on the event type
* Only interested in events of the current listener's generic parameter type
*/
public boolean isSupport(MetricsEvent event) {
Boolean eventMatch = eventMatchCache.computeIfAbsent(event.getClass(), clazz -> ReflectionUtils.match(getClass(), AbstractMetricsListener.class, event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,54 @@
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.key.MetricsPlaceValue;

/**
* App-level listener type, in most cases, can use the static method
* to produce an anonymous listener for general monitoring
*/
public class MetricsApplicationListener extends AbstractMetricsKeyListener {

public MetricsApplicationListener(MetricsKey metricsKey) {
super(metricsKey);
}

public static AbstractMetricsKeyListener onPostEventBuild(MetricsKey metricsKey, CombMetricsCollector collector) {
/**
* Perform auto-increment on the monitored key,
* Can use a custom listener instead of this generic operation
*
* @param metricsKey Monitor key
* @param collector Corresponding collector
*/
public static AbstractMetricsKeyListener onPostEventBuild(MetricsKey metricsKey, CombMetricsCollector<?> collector) {
return AbstractMetricsKeyListener.onEvent(metricsKey,
event -> collector.increment(metricsKey)
event -> collector.increment(metricsKey)
);
}

public static AbstractMetricsKeyListener onFinishEventBuild(MetricsKey metricsKey, MetricsPlaceValue placeType, CombMetricsCollector collector) {
/**
* To end the monitoring normally, in addition to increasing the number of corresponding indicators,
* use the introspection method to calculate the relevant rt indicators
*
* @param metricsKey Monitor key
* @param collector Corresponding collector
*/
public static AbstractMetricsKeyListener onFinishEventBuild(MetricsKey metricsKey, MetricsPlaceValue placeType, CombMetricsCollector<?> collector) {
return AbstractMetricsKeyListener.onFinish(metricsKey,
event -> {
collector.increment(metricsKey);
collector.addRt(placeType.getType(), event.getTimePair().calc());
}
event -> {
collector.increment(metricsKey);
collector.addRt(placeType.getType(), event.getTimePair().calc());
}
);
}

public static AbstractMetricsKeyListener onErrorEventBuild(MetricsKey metricsKey, MetricsPlaceValue placeType, CombMetricsCollector collector) {
/**
* Similar to onFinishEventBuild
*/
public static AbstractMetricsKeyListener onErrorEventBuild(MetricsKey metricsKey, MetricsPlaceValue placeType, CombMetricsCollector<?> collector) {
return AbstractMetricsKeyListener.onError(metricsKey,
event -> {
collector.increment(metricsKey);
collector.addRt(placeType.getType(), event.getTimePair().calc());
}
event -> {
collector.increment(metricsKey);
collector.addRt(placeType.getType(), event.getTimePair().calc());
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.metrics.model.key.MetricsPlaceValue;

/**
* Service-level listener type, in most cases, can use the static method
* to produce an anonymous listener for general monitoring.
* Similar to App-level
*/
public class MetricsServiceListener extends AbstractMetricsKeyListener {

public MetricsServiceListener(MetricsKey metricsKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,14 @@

package org.apache.dubbo.metrics.model;

import org.apache.dubbo.common.Version;
import org.apache.dubbo.metrics.model.key.MetricsKey;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.HashMap;
import java.util.Map;

import static org.apache.dubbo.common.constants.MetricsConstants.TAG_APPLICATION_NAME;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_APPLICATION_VERSION_KEY;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_HOSTNAME;
import static org.apache.dubbo.common.constants.MetricsConstants.TAG_IP;
import static org.apache.dubbo.common.utils.NetUtils.getLocalHost;
import static org.apache.dubbo.common.utils.NetUtils.getLocalHostName;
import java.util.Objects;

public class ApplicationMetric implements Metric {
private final ApplicationModel applicationModel;
private static final String version = Version.getVersion();
private static final String commitId = Version.getLastCommitId();
protected Map<String, String> extraInfo;

public ApplicationMetric(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
Expand All @@ -48,18 +38,29 @@ public String getApplicationName() {
return getApplicationModel().getApplicationName();
}

public String getData() {
return version;
@Override
public Map<String, String> getTags() {
return MetricsSupport.applicationTags(applicationModel, getExtraInfo());
}

public Map<String, String> getExtraInfo() {
return extraInfo;
}

public void setExtraInfo(Map<String, String> extraInfo) {
this.extraInfo = extraInfo;
}

@Override
public Map<String, String> getTags() {
Map<String, String> tags = new HashMap<>();
tags.put(TAG_IP, getLocalHost());
tags.put(TAG_HOSTNAME, getLocalHostName());
tags.put(TAG_APPLICATION_NAME, getApplicationName());
tags.put(TAG_APPLICATION_VERSION_KEY, version);
tags.put(MetricsKey.METADATA_GIT_COMMITID_METRIC.getName(), commitId);
return tags;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ApplicationMetric that = (ApplicationMetric) o;
return applicationModel.equals(that.applicationModel) && Objects.equals(extraInfo, that.extraInfo);
}

@Override
public int hashCode() {
return Objects.hash(applicationModel, extraInfo);
}
}
Loading