Skip to content
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ecaf257
feat: add check box
YoWuwuuuw Jun 24, 2025
6d033b7
Merge remote-tracking branch 'origin/2.x' into 2.x
YoWuwuuuw Jul 8, 2025
4446570
feat: Data format for supporting metadata
YoWuwuuuw Jul 10, 2025
731b6bd
test: add test
YoWuwuuuw Jul 10, 2025
49c54db
feat: support server-metadata access to RegistryService
YoWuwuuuw Jul 10, 2025
e1ad806
opt: solve conflicts and opt discovery-namingserver code
YoWuwuuuw Jul 10, 2025
66702a9
fix: fix some bug and improve code readability
YoWuwuuuw Jul 10, 2025
3953c32
test: fix test for discovery-namingserver
YoWuwuuuw Jul 11, 2025
81465da
feat: enhance discovery-loadbalance to support metadata
YoWuwuuuw Jul 11, 2025
8d015b5
feat: enhance discovery-loadbalance to support metadata
YoWuwuuuw Jul 11, 2025
bab788c
feat: support WeightedRandomLoadBalance
YoWuwuuuw Jul 12, 2025
a3d50dc
test: fix test
YoWuwuuuw Jul 12, 2025
d1c4f12
test: fix test
YoWuwuuuw Jul 12, 2025
775c7b1
Merge branch '2.x' into gsoc-metadata-support
YoWuwuuuw Jul 12, 2025
9f4439d
doc: improve Java doc and improve readability
YoWuwuuuw Jul 12, 2025
8ae37ae
opt: apply spotless
YoWuwuuuw Jul 12, 2025
27d1572
opt: rabbit review
YoWuwuuuw Jul 13, 2025
24ba004
test: fix test
YoWuwuuuw Jul 13, 2025
e562774
add changes
YoWuwuuuw Jul 13, 2025
61b95e1
test: add test for ServiceInstance
YoWuwuuuw Jul 13, 2025
1c9385d
test: fix ci
YoWuwuuuw Jul 13, 2025
1b29d94
test: add test for discovery-namingserver
YoWuwuuuw Jul 13, 2025
7cfd97c
test: fix ci
YoWuwuuuw Jul 13, 2025
58806b8
test:refactor test
YoWuwuuuw Jul 16, 2025
7af2622
feat: add routing support
YoWuwuuuw Jul 20, 2025
90eb1cb
Merge branch '2.x' into gsoc-routing-support-mock
YoWuwuuuw Jul 20, 2025
3b3116d
opt: fix ci, ai cr
YoWuwuuuw Jul 24, 2025
d05e99a
Merge remote-tracking branch 'origin/gsoc-routing-support-mock' into …
YoWuwuuuw Jul 24, 2025
6772afa
fix: pmd check
YoWuwuuuw Jul 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Add changes here for all PR submitted to the 2.x branch.

### feature:

- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] support XXX
- [[#7533](https://github.com/apache/incubator-seata/pull/7533)] support metadata-based registration and discovery, support namingserver type


### bugfix:
Expand Down
3 changes: 1 addition & 2 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
<!-- 请根据PR的类型添加 `变更记录` 到以下对应位置(feature/bugfix/optimize/test) 下 -->

### feature:

- [[#PR_NO](https://github.com/seata/seata/pull/PR_NO)] 支持 XXX
- [[#7533](https://github.com/apache/incubator-seata/pull/7533)] 支持元数据注册与发现能力并支持namingserver类型


### bugfix:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata;

import org.apache.seata.common.util.NetUtil;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* entity for packaging inetSocketAddress and metadata for loadBalance
*/
public class ServiceInstance {
private InetSocketAddress address;
private Map<String, Object> metadata;

public ServiceInstance(InetSocketAddress address, Map<String, Object> metadata) {
this.address = address;
this.metadata = metadata;
}

public ServiceInstance(Instance instance) {
this.address = new InetSocketAddress(
instance.getTransaction().getHost(), instance.getTransaction().getPort());
this.metadata = instance.getMetadata();
}

public ServiceInstance(InetSocketAddress address) {
this.address = address;
}

public InetSocketAddress getAddress() {
return address;
}

public void setAddress(InetSocketAddress address) {
this.address = address;
}

public Map<String, Object> getMetadata() {
return metadata;
}

public void setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
}

/**
* Converts a list of InetSocketAddress to a list of ServiceInstance.
* @param addresses list of InetSocketAddress
* @return list of ServiceInstance
*/
public static List<ServiceInstance> convertToServiceInstanceSet(List<InetSocketAddress> addresses) {
List<ServiceInstance> serviceInstances = new ArrayList<>();
if (addresses != null && !addresses.isEmpty()) {
for (InetSocketAddress address : addresses) {
NetUtil.validAddress(address);
serviceInstances.add(new ServiceInstance(address, null));
}
}
return serviceInstances;
}

/**
* Converts a set of InetSocketAddress to a set of ServiceInstance in RedisRegistryServiceImpl.
* @param addresses set of InetSocketAddress
* @return set of ServiceInstance
*/
public static Set<ServiceInstance> convertToServiceInstanceSet(Set<InetSocketAddress> addresses) {
Set<ServiceInstance> serviceInstances = new HashSet<>();
if (addresses != null && !addresses.isEmpty()) {
for (InetSocketAddress address : addresses) {
NetUtil.validAddress(address);
serviceInstances.add(new ServiceInstance(address, null));
}
}
return serviceInstances;
}

/**
* Creates a ServiceInstance from an InetSocketAddress and a Map<String, String> of metadata.
* @param address the InetSocketAddress
* @param stringMap the map of string metadata
* @return a new ServiceInstance
*/
public static ServiceInstance fromStringMap(InetSocketAddress address, Map<String, String> stringMap) {
Map<String, Object> metadata = new HashMap<>();
if (stringMap != null) {
metadata.putAll(stringMap);
}
return new ServiceInstance(address, metadata);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ServiceInstance that = (ServiceInstance) o;
return Objects.equals(address, that.address) && Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(address, metadata);
}

@Override
public String toString() {
return "ServiceInstance{" + "address=" + address + ", metadata=" + metadata + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata;

import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ServiceInstanceTest {

private final InetSocketAddress address1 = new InetSocketAddress("127.0.0.1", 8091);
private final InetSocketAddress address2 = new InetSocketAddress("127.0.0.1", 8092);

@Test
public void testConstructorAndGetters() {
Map<String, Object> metadata = new HashMap<>();
metadata.put("key1", "value1");

ServiceInstance instance1 = new ServiceInstance(address1, metadata);

assertEquals(address1, instance1.getAddress());
assertEquals(metadata, instance1.getMetadata());

Instance instance = Instance.getInstance();
instance.setTransaction(new Node.Endpoint("127.0.0.1", 8093));

ServiceInstance instance2 = new ServiceInstance(Instance.getInstance());

assertEquals(
Instance.getInstance().getTransaction().getHost(),
instance2.getAddress().getAddress().getHostAddress());
assertEquals(
Instance.getInstance().getTransaction().getPort(),
instance2.getAddress().getPort());

instance.setTransaction(null); // clean up after test
}

@Test
public void testConvertToServiceInstanceList() {
List<InetSocketAddress> addresses = new ArrayList<>();
addresses.add(address1);
addresses.add(address2);

List<ServiceInstance> serviceInstances = ServiceInstance.convertToServiceInstanceSet(addresses);

assertEquals(2, serviceInstances.size());
assertEquals(address1, serviceInstances.get(0).getAddress());
assertEquals(address2, serviceInstances.get(1).getAddress());
}

@Test
public void testConvertToServiceInstanceSet() {
Set<InetSocketAddress> addresses = new HashSet<>();
addresses.add(address1);
addresses.add(address2);

Set<ServiceInstance> serviceInstances = ServiceInstance.convertToServiceInstanceSet(addresses);

assertEquals(2, serviceInstances.size());
}

@Test
public void testSetAddressAndSetMetadata() {
ServiceInstance instance = new ServiceInstance(address1);
instance.setAddress(address2);
instance.setMetadata(new HashMap<>());

assertEquals(address2, instance.getAddress());
assertNotNull(instance.getMetadata());
}

@Test
public void testFromStringMap() {
Map<String, String> stringMap = new HashMap<>();
stringMap.put("stringKey", "stringValue");

ServiceInstance instance = ServiceInstance.fromStringMap(address1, stringMap);

assertEquals(address1, instance.getAddress());
assertNotNull(instance.getMetadata());
assertEquals("stringValue", instance.getMetadata().get("stringKey"));
}

@Test
public void testEqualsAndToString() {
ServiceInstance instance1 = new ServiceInstance(address1);
ServiceInstance instance2 = new ServiceInstance(address1);
ServiceInstance instance3 = new ServiceInstance(address2);

assertTrue(instance1.equals(instance2));
assertTrue(instance1.equals(instance1));
assertFalse(instance1.equals(instance3));
assertFalse(instance1.equals("string"));

assertTrue(instance1.toString().contains("8091"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.metadata.ServiceInstance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
Expand All @@ -48,6 +49,7 @@
import org.apache.seata.core.rpc.processor.RemotingProcessor;
import org.apache.seata.discovery.loadbalance.LoadBalanceFactory;
import org.apache.seata.discovery.registry.RegistryFactory;
import org.apache.seata.discovery.routing.RoutingManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -287,9 +289,13 @@ protected String loadBalance(String transactionServiceGroup, Object msg) {
InetSocketAddress address = null;
try {
@SuppressWarnings("unchecked")
List<InetSocketAddress> inetSocketAddressList =
List<ServiceInstance> serviceInstances =
RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
address = this.doSelect(inetSocketAddressList, msg);

// Apply routing filter
serviceInstances = applyRoutingFilter(serviceInstances, transactionServiceGroup, msg);

address = this.doSelect(serviceInstances, msg);
} catch (Exception ex) {
LOGGER.error("Select the address failed: {}", ex.getMessage());
}
Expand All @@ -299,12 +305,38 @@ protected String loadBalance(String transactionServiceGroup, Object msg) {
return NetUtil.toStringAddress(address);
}

protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {
/**
* Apply routing filter
* @param serviceInstances original service instances list
* @param transactionServiceGroup transaction service group
* @param msg message object
* @return filtered service instances list
*/
private List<ServiceInstance> applyRoutingFilter(
List<ServiceInstance> serviceInstances, String transactionServiceGroup, Object msg) {
try {
// Get routing manager
RoutingManager routingManager = RoutingManager.getInstance();

// Get transaction ID
String xid = getXid(msg);

// Execute routing filter
return routingManager.filter(serviceInstances);
} catch (Exception e) {
LOGGER.warn("Routing filter failed, using original service instances: {}", e.getMessage());
return serviceInstances;
}
}

protected InetSocketAddress doSelect(List<ServiceInstance> list, Object msg) throws Exception {
if (CollectionUtils.isNotEmpty(list)) {
if (list.size() > 1) {
return LoadBalanceFactory.getInstance().select(list, getXid(msg));
return LoadBalanceFactory.getInstance()
.select(list, getXid(msg))
.getAddress();
} else {
return list.get(0);
return list.get(0).getAddress();
}
}
return null;
Expand Down
Loading
Loading