Skip to content

Commit bfce75c

Browse files
committed
Revert "Revert "Local references support mergeable (#9645)" (#10707)"
This reverts commit 67d94ae.
1 parent 67d94ae commit bfce75c

File tree

9 files changed

+181
-23
lines changed

9 files changed

+181
-23
lines changed

dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -543,11 +543,7 @@ private void createInvokerForLocal(Map<String, String> referenceParameters) {
543543
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName(), referenceParameters);
544544
url = url.setScopeModel(getScopeModel());
545545
url = url.setServiceModel(consumerModel);
546-
Invoker<?> withFilter = protocolSPI.refer(interfaceClass, url);
547-
// Local Invoke ( Support Cluster Filter / Filter )
548-
List<Invoker<?>> invokers = new ArrayList<>();
549-
invokers.add(withFilter);
550-
invoker = Cluster.getCluster(url.getScopeModel(), Cluster.DEFAULT).join(new StaticDirectory(url, invokers), true);
546+
invoker = protocolSPI.refer(interfaceClass, url);
551547

552548
if (logger.isInfoEnabled()) {
553549
logger.info("Using in jvm service " + interfaceClass.getName());

dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/ReferenceConfigTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,10 +477,9 @@ public void testCreateInvokerForLocalRefer() {
477477
.initialize();
478478

479479
referenceConfig.init();
480-
Assertions.assertTrue(referenceConfig.getInvoker() instanceof MockClusterInvoker);
481-
Invoker<?> withFilter = ((MockClusterInvoker<?>) referenceConfig.getInvoker()).getDirectory().getAllInvokers().get(0);
482-
Assertions.assertTrue(withFilter instanceof ListenerInvokerWrapper);
483-
Assertions.assertTrue(((ListenerInvokerWrapper<?>) withFilter).getInvoker() instanceof InjvmInvoker);
480+
Invoker<?> withFilter = ((ListenerInvokerWrapper<?>) referenceConfig.getInvoker()).getInvoker();
481+
withFilter = ((MockClusterInvoker<?>) withFilter).getDirectory().getAllInvokers().get(0);
482+
Assertions.assertTrue(withFilter instanceof InjvmInvoker);
484483
URL url = withFilter.getUrl();
485484
Assertions.assertEquals("application1", url.getParameter("application"));
486485
Assertions.assertEquals("value1", url.getParameter("key1"));

dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public List<String> getEffectReferenceRegistryURLs() {
263263

264264
protected static class MultipleNotifyListenerWrapper implements NotifyListener {
265265

266-
Map<URL, SingleNotifyListener> registryMap = new ConcurrentHashMap<URL, SingleNotifyListener>(4);
266+
Map<URL, SingleNotifyListener> registryMap = new ConcurrentHashMap<>(4);
267267
NotifyListener sourceNotifyListener;
268268

269269
public MultipleNotifyListenerWrapper(NotifyListener sourceNotifyListener) {

dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242

4343
import java.lang.reflect.Type;
4444
import java.util.HashMap;
45-
import java.util.Map;
4645
import java.util.Objects;
4746
import java.util.concurrent.CompletableFuture;
4847
import java.util.concurrent.ExecutorService;
@@ -63,18 +62,18 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
6362

6463
private final String key;
6564

66-
private final Map<String, Exporter<?>> exporterMap;
65+
private final Exporter<?> exporter;
6766

6867
private final ExecutorRepository executorRepository;
6968

7069
private final ParamDeepCopyUtil paramDeepCopyUtil;
7170

7271
private final boolean shouldIgnoreSameModule;
7372

74-
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
73+
InjvmInvoker(Class<T> type, URL url, String key, Exporter<?> exporter) {
7574
super(type, url);
7675
this.key = key;
77-
this.exporterMap = exporterMap;
76+
this.exporter = exporter;
7877
this.executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
7978
this.paramDeepCopyUtil = url.getOrDefaultFrameworkModel().getExtensionLoader(ParamDeepCopyUtil.class)
8079
.getExtension(url.getParameter(CommonConstants.INJVM_COPY_UTIL_KEY, DefaultParamDeepCopyUtil.NAME));
@@ -83,7 +82,6 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
8382

8483
@Override
8584
public boolean isAvailable() {
86-
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
8785
if (exporter == null) {
8886
return false;
8987
} else {
@@ -93,7 +91,6 @@ public boolean isAvailable() {
9391

9492
@Override
9593
public Result doInvoke(Invocation invocation) throws Throwable {
96-
Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
9794
if (exporter == null) {
9895
throw new RpcException("Service [" + key + "] not found.");
9996
}

dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,29 @@
1818

1919
import org.apache.dubbo.common.URL;
2020
import org.apache.dubbo.common.utils.CollectionUtils;
21+
import org.apache.dubbo.common.utils.StringUtils;
2122
import org.apache.dubbo.common.utils.UrlUtils;
2223
import org.apache.dubbo.rpc.Exporter;
2324
import org.apache.dubbo.rpc.Invoker;
2425
import org.apache.dubbo.rpc.Protocol;
2526
import org.apache.dubbo.rpc.RpcException;
27+
import org.apache.dubbo.rpc.cluster.Cluster;
28+
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
29+
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
30+
import org.apache.dubbo.rpc.cluster.support.MergeableCluster;
2631
import org.apache.dubbo.rpc.model.ScopeModel;
2732
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
2833
import org.apache.dubbo.rpc.support.ProtocolUtils;
2934

35+
import java.util.ArrayList;
36+
import java.util.List;
3037
import java.util.Map;
3138

3239
import static org.apache.dubbo.common.constants.CommonConstants.BROADCAST_CLUSTER;
3340
import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
41+
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
42+
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
43+
import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
3444
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
3545
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
3646
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
@@ -69,7 +79,7 @@ static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
6979
if (result == null) {
7080
return null;
7181
} else if (ProtocolUtils.isGeneric(
72-
result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
82+
result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
7383
return null;
7484
} else {
7585
return result;
@@ -88,7 +98,15 @@ public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
8898

8999
@Override
90100
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
91-
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
101+
// group="a,b" or group="*"
102+
String group = url.getParameter(GROUP_KEY);
103+
if (StringUtils.isNotEmpty(group)) {
104+
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
105+
return doCreateInvoker(url, Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), serviceType);
106+
}
107+
}
108+
Cluster cluster = Cluster.getCluster(url.getScopeModel(), url.getParameter(CLUSTER_KEY));
109+
return doCreateInvoker(url, cluster, serviceType);
92110
}
93111

94112
public boolean isInjvmRefer(URL url) {
@@ -116,4 +134,34 @@ public boolean isInjvmRefer(URL url) {
116134
return false;
117135
}
118136
}
137+
138+
@SuppressWarnings({"unchecked", "rawtypes"})
139+
protected <T> ClusterInvoker<T> doCreateInvoker(URL url, Cluster cluster, Class<T> type) {
140+
StaticDirectory directory = new StaticDirectory(url, getInvokers(exporterMap, url, type));
141+
return (ClusterInvoker<T>) cluster.join(directory, true);
142+
}
143+
144+
private <T> List<Invoker<T>> getInvokers(Map<String, Exporter<?>> map, URL url, Class<T> type) {
145+
List<Invoker<T>> result = new ArrayList<>();
146+
147+
if (!url.getServiceKey().contains("*")) {
148+
Exporter<?> exporter = map.get(url.getServiceKey());
149+
InjvmInvoker<T> invoker = new InjvmInvoker<>(type, url, url.getServiceKey(), exporter);
150+
result.add(invoker);
151+
} else {
152+
if (CollectionUtils.isNotEmptyMap(map)) {
153+
for (Exporter<?> exporter : map.values()) {
154+
if (UrlUtils.isServiceKeyMatch(url, exporter.getInvoker().getUrl())) {
155+
URL providerUrl = exporter.getInvoker().getUrl();
156+
URL consumerUrl = url.addParameter(GROUP_KEY, providerUrl.getGroup())
157+
.addParameter(VERSION_KEY, providerUrl.getVersion());
158+
InjvmInvoker<T> invoker = new InjvmInvoker<>(type, consumerUrl, consumerUrl.getServiceKey(), exporter);
159+
result.add(invoker);
160+
}
161+
}
162+
}
163+
}
164+
165+
return result;
166+
}
119167
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.injvm;
18+
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
public class Hello1ServiceImpl implements HelloService {
24+
25+
@Override
26+
public List<String> hellos() {
27+
List<String> res = new ArrayList<>();
28+
res.add("consumer-hello-1");
29+
return res;
30+
}
31+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dubbo.rpc.protocol.injvm;
19+
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
24+
25+
public class Hello2ServiceImpl implements HelloService {
26+
27+
@Override
28+
public List<String> hellos() {
29+
List<String> res = new ArrayList<>();
30+
res.add("consumer-hello-2");
31+
return res;
32+
}
33+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dubbo.rpc.protocol.injvm;
19+
20+
import java.util.List;
21+
22+
public interface HelloService {
23+
List<String> hellos();
24+
}

dubbo-rpc/dubbo-rpc-injvm/src/test/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocolTest.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,16 @@
2929
import org.junit.jupiter.api.Test;
3030

3131
import java.util.ArrayList;
32-
import java.util.HashMap;
3332
import java.util.List;
3433

35-
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
36-
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
37-
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
34+
import static org.apache.dubbo.common.constants.CommonConstants.*;
3835
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
3936
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
4037
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
4138
import static org.apache.dubbo.rpc.Constants.SCOPE_KEY;
4239
import static org.apache.dubbo.rpc.Constants.SCOPE_LOCAL;
4340
import static org.apache.dubbo.rpc.Constants.SCOPE_REMOTE;
41+
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
4442
import static org.junit.jupiter.api.Assertions.assertEquals;
4543
import static org.junit.jupiter.api.Assertions.assertFalse;
4644
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -76,7 +74,7 @@ public void testLocalProtocol() throws Exception {
7674
assertEquals(service.getSize(new String[]{"", "", ""}), 3);
7775
service.invoke("injvm://127.0.0.1/TestService", "invoke");
7876

79-
InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, new HashMap<>());
77+
InjvmInvoker<?> injvmInvoker = new InjvmInvoker<>(DemoService.class, URL.valueOf("injvm://127.0.0.1/TestService"), null, null);
8078
assertFalse(injvmInvoker.isAvailable());
8179

8280
}
@@ -137,4 +135,36 @@ public void testLocalProtocolAsync() throws Exception {
137135
assertNull(service.getAsyncResult());
138136
}
139137

138+
@Test
139+
public void testLocalProtocolForMergeResult() throws Exception {
140+
HelloService helloService1 = new Hello1ServiceImpl();
141+
URL url = URL.valueOf("injvm://127.0.0.1/HelloService")
142+
.addParameter(INTERFACE_KEY, HelloService.class.getName())
143+
.addParameter(APPLICATION_KEY, "consumer")
144+
.addParameter(GROUP_KEY, "g1");
145+
Invoker<?> invoker1 = proxy.getInvoker(helloService1, HelloService.class, url);
146+
assertTrue(invoker1.isAvailable());
147+
Exporter<?> exporter1 = protocol.export(invoker1);
148+
exporters.add(exporter1);
149+
150+
URL url2 = URL.valueOf("injvm://127.0.0.1/HelloService")
151+
.addParameter(INTERFACE_KEY, HelloService.class.getName())
152+
.addParameter(APPLICATION_KEY, "consumer")
153+
.addParameter(GROUP_KEY, "g2");
154+
HelloService helloService2 = new Hello2ServiceImpl();
155+
Invoker<?> invoker2 = proxy.getInvoker(helloService2, HelloService.class, url2);
156+
assertTrue(invoker2.isAvailable());
157+
Exporter<?> exporter2 = protocol.export(invoker2);
158+
exporters.add(exporter2);
159+
160+
161+
URL referUrl = URL.valueOf("injvm://127.0.0.1/HelloService")
162+
.addParameter(INTERFACE_KEY, HelloService.class.getName())
163+
.addParameter(APPLICATION_KEY, "consumer")
164+
.addParameter(GROUP_KEY, "*")
165+
.addParameter(MERGER_KEY, "list");
166+
List<String> list = proxy.getProxy(protocol.refer(HelloService.class, referUrl)).hellos();
167+
assertEquals(2, list.size());
168+
}
169+
140170
}

0 commit comments

Comments
 (0)