Skip to content

Commit 0c1b798

Browse files
authored
Make xds router run (#11319)
1 parent fd3fb63 commit 0c1b798

File tree

16 files changed

+964
-813
lines changed

16 files changed

+964
-813
lines changed

dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/XdsServiceDiscovery.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
*/
1717
package org.apache.dubbo.registry.xds;
1818

19+
import java.util.Collection;
20+
import java.util.Comparator;
21+
import java.util.LinkedList;
22+
import java.util.List;
23+
import java.util.Set;
24+
1925
import org.apache.dubbo.common.URL;
2026
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
2127
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -28,12 +34,6 @@
2834
import org.apache.dubbo.rpc.model.ApplicationModel;
2935
import org.apache.dubbo.rpc.model.ScopeModelUtil;
3036

31-
import java.util.Collection;
32-
import java.util.Comparator;
33-
import java.util.LinkedList;
34-
import java.util.List;
35-
import java.util.Set;
36-
3737
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_INITIALIZE_XDS;
3838
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS;
3939

@@ -88,6 +88,7 @@ private List<ServiceInstance> changedToInstances(String serviceName, Collection<
8888
try {
8989
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, endpoint.getAddress(), endpoint.getPortValue(), ScopeModelUtil.getApplicationModel(getUrl().getScopeModel()));
9090
// fill metadata by SelfHostMetaServiceDiscovery, will be fetched by RPC request
91+
serviceInstance.putExtendParam("clusterName", endpoint.getClusterName());
9192
fillServiceInstance(serviceInstance);
9293
instances.add(serviceInstance);
9394
} catch (Throwable t) {
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.registry.xds.util;
18+
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.apache.dubbo.common.URL;
25+
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
26+
import org.apache.dubbo.common.logger.LoggerFactory;
27+
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
28+
import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
29+
import org.apache.dubbo.registry.xds.util.protocol.DeltaResource;
30+
import org.apache.dubbo.rpc.model.ApplicationModel;
31+
32+
import io.envoyproxy.envoy.config.core.v3.Node;
33+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
34+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
35+
import io.grpc.stub.StreamObserver;
36+
37+
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS;
38+
39+
public class AdsObserver {
40+
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AdsObserver.class);
41+
private final ApplicationModel applicationModel;
42+
private final URL url;
43+
private final Node node;
44+
private volatile XdsChannel xdsChannel;
45+
46+
private final Map<String, XdsListener> listeners = new ConcurrentHashMap<>();
47+
48+
protected StreamObserver<DiscoveryRequest> requestObserver;
49+
50+
private final Map<String, DiscoveryRequest> observedResources = new ConcurrentHashMap<>();
51+
52+
public AdsObserver(URL url, Node node) {
53+
this.url = url;
54+
this.node = node;
55+
this.xdsChannel = new XdsChannel(url);
56+
this.applicationModel = url.getOrDefaultApplicationModel();
57+
}
58+
59+
public <T, S extends DeltaResource<T>> void addListener(AbstractProtocol<T, S> protocol) {
60+
listeners.put(protocol.getTypeUrl(), protocol);
61+
}
62+
63+
public void request(DiscoveryRequest discoveryRequest) {
64+
if (requestObserver == null) {
65+
requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
66+
}
67+
requestObserver.onNext(discoveryRequest);
68+
observedResources.put(discoveryRequest.getTypeUrl(), discoveryRequest);
69+
}
70+
71+
private static class ResponseObserver implements StreamObserver<DiscoveryResponse> {
72+
private AdsObserver adsObserver;
73+
74+
public ResponseObserver(AdsObserver adsObserver) {
75+
this.adsObserver = adsObserver;
76+
}
77+
78+
@Override
79+
public void onNext(DiscoveryResponse discoveryResponse) {
80+
XdsListener xdsListener = adsObserver.listeners.get(discoveryResponse.getTypeUrl());
81+
xdsListener.process(discoveryResponse);
82+
adsObserver.requestObserver.onNext(buildAck(discoveryResponse));
83+
}
84+
85+
protected DiscoveryRequest buildAck(DiscoveryResponse response) {
86+
// for ACK
87+
return DiscoveryRequest.newBuilder()
88+
.setNode(adsObserver.node)
89+
.setTypeUrl(response.getTypeUrl())
90+
.setVersionInfo(response.getVersionInfo())
91+
.setResponseNonce(response.getNonce())
92+
.addAllResourceNames(adsObserver.observedResources.get(response.getTypeUrl()).getResourceNamesList())
93+
.build();
94+
}
95+
96+
@Override
97+
public void onError(Throwable throwable) {
98+
logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "xDS Client received error message! detail:", throwable);
99+
adsObserver.triggerReConnectTask();
100+
}
101+
102+
@Override
103+
public void onCompleted() {
104+
logger.info("xDS Client completed");
105+
adsObserver.triggerReConnectTask();
106+
}
107+
}
108+
109+
private void triggerReConnectTask() {
110+
ScheduledExecutorService scheduledFuture = applicationModel.getFrameworkModel().getBeanFactory()
111+
.getBean(FrameworkExecutorRepository.class).getSharedScheduledExecutor();
112+
scheduledFuture.schedule(this::recover, 3, TimeUnit.SECONDS);
113+
}
114+
115+
private void recover() {
116+
try {
117+
xdsChannel = new XdsChannel(url);
118+
if (xdsChannel.getChannel() != null) {
119+
requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this));
120+
observedResources.values().forEach(requestObserver::onNext);
121+
return;
122+
} else {
123+
logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Recover failed for xDS connection. Will retry. Create channel failed.");
124+
}
125+
} catch (Exception e) {
126+
logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Recover failed for xDS connection. Will retry.", e);
127+
}
128+
triggerReConnectTask();
129+
}
130+
}

dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@
1818

1919
import java.util.Collections;
2020
import java.util.HashSet;
21+
import java.util.LinkedList;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.Set;
2325
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.ExecutorService;
2427
import java.util.concurrent.atomic.AtomicBoolean;
2528
import java.util.function.Consumer;
29+
import java.util.stream.Collectors;
2630

2731
import org.apache.dubbo.common.URL;
32+
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
2833
import org.apache.dubbo.common.utils.CollectionUtils;
2934
import org.apache.dubbo.common.utils.ConcurrentHashSet;
3035
import org.apache.dubbo.registry.xds.util.protocol.AbstractProtocol;
@@ -38,8 +43,6 @@
3843
import org.apache.dubbo.rpc.cluster.router.xds.RdsVirtualHostListener;
3944
import org.apache.dubbo.rpc.model.ApplicationModel;
4045

41-
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
42-
4346
public class PilotExchanger {
4447

4548
protected final XdsChannel xdsChannel;
@@ -61,15 +64,18 @@ public class PilotExchanger {
6164

6265
private final Map<String, Consumer<RdsVirtualHostListener>> rdsObserveConsumer = new ConcurrentHashMap<>();
6366

64-
private static PilotExchanger GLOBAL_PILOT_EXCHANGER = null;
67+
private static PilotExchanger GLOBAL_PILOT_EXCHANGER = null;
68+
69+
private final ApplicationModel applicationModel;
6570

6671
protected PilotExchanger(URL url) {
6772
xdsChannel = new XdsChannel(url);
6873
int pollingTimeout = url.getParameter("pollingTimeout", 10);
69-
ApplicationModel applicationModel = url.getOrDefaultApplicationModel();
70-
this.ldsProtocol = new LdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout, applicationModel);
71-
this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout, applicationModel);
72-
this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build(), pollingTimeout, applicationModel);
74+
this.applicationModel = url.getOrDefaultApplicationModel();
75+
AdsObserver adsObserver = new AdsObserver(url, NodeBuilder.build());
76+
this.ldsProtocol = new LdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
77+
this.rdsProtocol = new RdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
78+
this.edsProtocol = new EdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout);
7379

7480
this.listenerResult = ldsProtocol.getListeners();
7581
this.routeResult = rdsProtocol.getResource(listenerResult.values().iterator().next().getRouteConfigNames());
@@ -96,26 +102,31 @@ protected PilotExchanger(URL url) {
96102
private void createRouteObserve() {
97103
rdsProtocol.observeResource(listenerResult.values().iterator().next().getRouteConfigNames(), (newResult) -> {
98104
// check if observed domain update ( will update endpoint observation )
105+
List<String> domainsToUpdate = new LinkedList<>();
99106
domainObserveConsumer.forEach((domain, consumer) -> {
100107
newResult.values().forEach(o -> {
101108
Set<String> newRoute = o.searchDomain(domain);
102-
for (Map.Entry<String, RouteResult> entry: routeResult.entrySet()) {
109+
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
103110
if (!entry.getValue().searchDomain(domain).equals(newRoute)) {
104111
// routers in observed domain has been updated
105112
// Long domainRequest = domainObserveRequest.get(domain);
106113
// router list is empty when observeEndpoints() called and domainRequest has not been created yet
107114
// create new observation
108-
doObserveEndpoints(domain);
115+
domainsToUpdate.add(domain);
116+
// doObserveEndpoints(domain);
109117
}
110118
}
111119
});
112120
});
113121
routeResult = newResult;
122+
ExecutorService executorService = applicationModel.getFrameworkModel().getBeanFactory()
123+
.getBean(FrameworkExecutorRepository.class).getSharedExecutor();
124+
executorService.submit(() -> domainsToUpdate.forEach(this::doObserveEndpoints));
114125
}, false);
115126
}
116127

117128
public static PilotExchanger initialize(URL url) {
118-
synchronized (PilotExchanger.class){
129+
synchronized (PilotExchanger.class) {
119130
if (GLOBAL_PILOT_EXCHANGER != null) {
120131
return GLOBAL_PILOT_EXCHANGER;
121132
}
@@ -140,15 +151,15 @@ public void destroy() {
140151

141152
public Set<String> getServices() {
142153
Set<String> domains = new HashSet<>();
143-
for (Map.Entry<String, RouteResult> entry: routeResult.entrySet()) {
154+
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
144155
domains.addAll(entry.getValue().getDomains());
145156
}
146157
return domains;
147158
}
148159

149160
public Set<Endpoint> getEndpoints(String domain) {
150161
Set<Endpoint> endpoints = new HashSet<>();
151-
for (Map.Entry<String, RouteResult> entry: routeResult.entrySet()) {
162+
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
152163
Set<String> cluster = entry.getValue().searchDomain(domain);
153164
if (CollectionUtils.isNotEmpty(cluster)) {
154165
Map<String, EndpointResult> endpointResultList = edsProtocol.getResource(cluster);
@@ -176,19 +187,21 @@ public void observeEndpoints(String domain, Consumer<Set<Endpoint>> consumer) {
176187
}
177188

178189
private void doObserveEndpoints(String domain) {
179-
for (Map.Entry<String, RouteResult> entry: routeResult.entrySet()) {
190+
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
180191
Set<String> router = entry.getValue().searchDomain(domain);
181192
// if router is empty, do nothing
182193
// observation will be created when RDS updates
183194
if (CollectionUtils.isNotEmpty(router)) {
184195
edsProtocol.observeResource(
185196
router,
186197
(endpointResultMap) -> {
187-
endpointResultMap.forEach((k, v) -> {
188-
// notify consumers
189-
domainObserveConsumer.get(domain).forEach(
190-
consumer1 -> consumer1.accept(v.getEndpoints()));
191-
});
198+
Set<Endpoint> endpoints = endpointResultMap.values().stream()
199+
.map(EndpointResult::getEndpoints)
200+
.flatMap(Set::stream)
201+
.collect(Collectors.toSet());
202+
for (Consumer<Set<Endpoint>> consumer : domainObserveConsumer.get(domain)) {
203+
consumer.accept(endpoints);
204+
}
192205
}, false);
193206
domainObserveRequest.add(domain);
194207
}
@@ -201,18 +214,28 @@ public void unObserveEndpoints(String domain, Consumer<Set<Endpoint>> consumer)
201214
domainObserveRequest.remove(domain);
202215
}
203216

204-
public VirtualHost getVirtualHost(String domain) {
205-
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
206-
if (entry.getValue().searchVirtualHost(domain) != null) {
207-
return entry.getValue().searchVirtualHost(domain);
208-
}
209-
}
210-
return null;
217+
public void observeEds(Set<String> clusterNames, Consumer<Map<String, EndpointResult>> consumer) {
218+
edsProtocol.observeResource(clusterNames, consumer, false);
211219
}
212220

213-
public void unObserveRds(String domain) {
214-
for (Map.Entry<String, RouteResult> entry : routeResult.entrySet()) {
215-
entry.getValue().removeVirtualHost(domain);
216-
}
221+
public void unObserveEds(Set<String> clusterNames, Consumer<Map<String, EndpointResult>> consumer) {
222+
edsProtocol.unobserveResource(clusterNames, consumer);
223+
}
224+
225+
public void observeRds(Set<String> clusterNames, Consumer<Map<String, RouteResult>> consumer) {
226+
rdsProtocol.observeResource(clusterNames, consumer, false);
227+
}
228+
229+
public void unObserveRds(Set<String> clusterNames, Consumer<Map<String, RouteResult>> consumer) {
230+
rdsProtocol.unobserveResource(clusterNames, consumer);
217231
}
232+
233+
public void observeLds(Consumer<Map<String, ListenerResult>> consumer) {
234+
ldsProtocol.observeResource(Collections.singleton(AbstractProtocol.emptyResourceName), consumer, false);
235+
}
236+
237+
public void unObserveLds(Consumer<Map<String, ListenerResult>> consumer) {
238+
ldsProtocol.unobserveResource(Collections.singleton(AbstractProtocol.emptyResourceName), consumer);
239+
}
240+
218241
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.registry.xds.util;
18+
19+
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
20+
21+
public interface XdsListener {
22+
void process(DiscoveryResponse discoveryResponse);
23+
}

0 commit comments

Comments
 (0)