diff --git a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index 9fd93b2e065..bdeed8d0ac9 100644 --- a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -43,13 +43,13 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * The type Redis registry service. @@ -155,23 +155,36 @@ static RedisRegistryServiceImpl getInstance() { public void register(ServiceInstance instance) { InetSocketAddress address = instance.getAddress(); NetUtil.validAddress(address); - doRegisterOrExpire(address, true); + // 1) set alive key to ensure key exists + doRegisterOrExpire(address); + // 2) write metadata once and set ttl; subsequent heartbeats will refresh ttl only + String serverAddr = NetUtil.toStringAddress(address); + String metaKey = getRedisRegistryMetaKey(serverAddr); + try (Jedis jedis = jedisPool.getResource(); + Pipeline pipelined = jedis.pipelined()) { + if (instance.getMetadata() != null && !instance.getMetadata().isEmpty()) { + for (Map.Entry e : instance.getMetadata().entrySet()) { + String value = e.getValue() == null ? "" : String.valueOf(e.getValue()); + pipelined.hset(metaKey, e.getKey(), value); + } + } + // ensure metadata key ttl + pipelined.expire(metaKey, (int) KEY_TTL); + // 3) publish register after metadata prepared + pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER); + pipelined.sync(); + } RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address, KEY_REFRESH_PERIOD, this::doRegisterOrExpire); } private void doRegisterOrExpire(InetSocketAddress address) { - doRegisterOrExpire(address, false); - } - - private void doRegisterOrExpire(InetSocketAddress address, boolean publish) { String serverAddr = NetUtil.toStringAddress(address); String key = getRedisRegistryKey() + "_" + serverAddr; // key = registry.redis.${cluster}_ip:port try (Jedis jedis = jedisPool.getResource(); Pipeline pipelined = jedis.pipelined()) { pipelined.setex(key, KEY_TTL, ManagementFactory.getRuntimeMXBean().getName()); - if (publish) { - pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER); - } + // refresh metadata ttl as well + pipelined.expire(getRedisRegistryMetaKey(serverAddr), (int) KEY_TTL); pipelined.sync(); } } @@ -185,6 +198,9 @@ public void unregister(ServiceInstance instance) { Pipeline pipelined = jedis.pipelined()) { pipelined.hdel(getRedisRegistryKey(), serverAddr); pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER); + // remove ephemeral key and metadata hash proactively + pipelined.del(getRedisRegistryKey() + "_" + serverAddr); + pipelined.del(getRedisRegistryMetaKey(serverAddr)); pipelined.sync(); } } @@ -252,9 +268,21 @@ List lookupByCluster(String clusterName) { String eventType = msgr[1]; switch (eventType) { case RedisListener.REGISTER: - CollectionUtils.computeIfAbsent( - CLUSTER_INSTANCE_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2)) - .add(new ServiceInstance(NetUtil.toInetSocketAddress(serverAddr))); + { + Map meta = null; + try (Jedis jedis = jedisPool.getResource()) { + meta = jedis.hgetAll(getRedisRegistryMetaKey(serverAddr)); + } + InetSocketAddress addr = NetUtil.toInetSocketAddress(serverAddr); + ServiceInstance instance = meta == null || meta.isEmpty() + ? new ServiceInstance(addr) + : ServiceInstance.fromStringMap(addr, meta); + Set set = CollectionUtils.computeIfAbsent( + CLUSTER_INSTANCE_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2)); + // replace older entry with same address to avoid duplicates with/without metadata + set.removeIf(si -> si.getAddress().equals(addr)); + set.add(instance); + } break; case RedisListener.UN_REGISTER: removeServerAddressByPushEmptyProtection(clusterName, serverAddr); @@ -365,26 +393,30 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin scanParams.count(10); scanParams.match(redisRegistryKey + "_*"); String cursor = ScanParams.SCAN_POINTER_START; - Set newAddressSet = ConcurrentHashMap.newKeySet(2); + Set currentInstances = ConcurrentHashMap.newKeySet(2); do { ScanResult scanResult = jedis.scan(cursor, scanParams); cursor = scanResult.getCursor(); List instances = scanResult.getResult(); if (instances != null && !instances.isEmpty()) { // key = registry.redis.${cluster}_ip:port - Set part = instances.stream() - .map(key -> { - String[] split = key.split("_"); - return NetUtil.toInetSocketAddress(split[1]); - }) - .collect(Collectors.toSet()); - - newAddressSet.addAll(part); + for (String key : instances) { + String[] split = key.split("_"); + if (split.length < 2) { + continue; + } + String serverAddr = split[1]; + InetSocketAddress address = NetUtil.toInetSocketAddress(serverAddr); + Map meta = jedis.hgetAll(getRedisRegistryMetaKey(serverAddr)); + if (meta == null || meta.isEmpty()) { + currentInstances.add(new ServiceInstance(address)); + } else { + currentInstances.add(ServiceInstance.fromStringMap(address, meta)); + } + } } } while (!cursor.equals(ScanParams.SCAN_POINTER_START)); - Set currentInstances = ServiceInstance.convertToServiceInstanceSet(newAddressSet); - if (CollectionUtils.isNotEmpty(currentInstances) && !currentInstances.equals(CLUSTER_INSTANCE_MAP.get(clusterName))) { CLUSTER_INSTANCE_MAP.put(clusterName, currentInstances); @@ -395,6 +427,11 @@ private String getRedisRegistryKey() { return REDIS_FILEKEY_PREFIX + clusterName; } + private String getRedisRegistryMetaKey(String serverAddr) { + // meta key example: registry.redis.${cluster}.meta_ip:port + return REDIS_FILEKEY_PREFIX + clusterName + ".meta_" + serverAddr; + } + private String getRedisAddrFileKey() { return REDIS_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY; } diff --git a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java index d8738f2d55d..b3c41fd8b19 100644 --- a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java +++ b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java @@ -35,6 +35,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -66,15 +69,24 @@ public static void init() throws IOException { @Test @Order(1) - public void testFlow() { - ServiceInstance serviceInstance = new ServiceInstance(new InetSocketAddress(NetUtil.getLocalIp(), 8091)); + public void testRegisterWithMetadataAndLookup() { + Map meta = new HashMap<>(); + meta.put("zone", "A"); + meta.put("version", "v1"); + ServiceInstance serviceInstance = new ServiceInstance(new InetSocketAddress(NetUtil.getLocalIp(), 8092), meta); redisRegistryService.register(serviceInstance); - Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 0); + List instances = redisRegistryService.lookup("default_tx_group"); + ServiceInstance target = instances.stream() + .filter(si -> si.getAddress().getPort() == 8092) + .findFirst() + .orElse(null); + Assertions.assertNotNull(target); + Assertions.assertNotNull(target.getMetadata()); + Assertions.assertEquals("A", target.getMetadata().get("zone")); + Assertions.assertEquals("v1", target.getMetadata().get("version")); redisRegistryService.unregister(serviceInstance); - - Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 0); } @Test