Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* The type Redis registry service.
Expand Down Expand Up @@ -155,23 +155,36 @@ static RedisRegistryServiceImpl getInstance() {
public void register(ServiceInstance instance) {
InetSocketAddress address = instance.getAddress();
NetUtil.validAddress(address);
doRegisterOrExpire(address, true);
// 1) set alive key to ensure key exists
doRegisterOrExpire(address);
// 2) write metadata once and set ttl; subsequent heartbeats will refresh ttl only
String serverAddr = NetUtil.toStringAddress(address);
String metaKey = getRedisRegistryMetaKey(serverAddr);
try (Jedis jedis = jedisPool.getResource();
Pipeline pipelined = jedis.pipelined()) {
if (instance.getMetadata() != null && !instance.getMetadata().isEmpty()) {
for (Map.Entry<String, Object> e : instance.getMetadata().entrySet()) {
String value = e.getValue() == null ? "" : String.valueOf(e.getValue());
pipelined.hset(metaKey, e.getKey(), value);
}
}
// ensure metadata key ttl
pipelined.expire(metaKey, (int) KEY_TTL);
// 3) publish register after metadata prepared
pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
pipelined.sync();
}
RegistryHeartBeats.addHeartBeat(REGISTRY_TYPE, address, KEY_REFRESH_PERIOD, this::doRegisterOrExpire);
}

private void doRegisterOrExpire(InetSocketAddress address) {
doRegisterOrExpire(address, false);
}

private void doRegisterOrExpire(InetSocketAddress address, boolean publish) {
String serverAddr = NetUtil.toStringAddress(address);
String key = getRedisRegistryKey() + "_" + serverAddr; // key = registry.redis.${cluster}_ip:port
try (Jedis jedis = jedisPool.getResource();
Pipeline pipelined = jedis.pipelined()) {
pipelined.setex(key, KEY_TTL, ManagementFactory.getRuntimeMXBean().getName());
if (publish) {
pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
}
// refresh metadata ttl as well
pipelined.expire(getRedisRegistryMetaKey(serverAddr), (int) KEY_TTL);
pipelined.sync();
}
}
Expand All @@ -185,6 +198,9 @@ public void unregister(ServiceInstance instance) {
Pipeline pipelined = jedis.pipelined()) {
pipelined.hdel(getRedisRegistryKey(), serverAddr);
pipelined.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER);
// remove ephemeral key and metadata hash proactively
pipelined.del(getRedisRegistryKey() + "_" + serverAddr);
pipelined.del(getRedisRegistryMetaKey(serverAddr));
pipelined.sync();
}
}
Expand Down Expand Up @@ -252,9 +268,21 @@ List<ServiceInstance> lookupByCluster(String clusterName) {
String eventType = msgr[1];
switch (eventType) {
case RedisListener.REGISTER:
CollectionUtils.computeIfAbsent(
CLUSTER_INSTANCE_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2))
.add(new ServiceInstance(NetUtil.toInetSocketAddress(serverAddr)));
{
Map<String, String> meta = null;
try (Jedis jedis = jedisPool.getResource()) {
meta = jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
}
InetSocketAddress addr = NetUtil.toInetSocketAddress(serverAddr);
ServiceInstance instance = meta == null || meta.isEmpty()
? new ServiceInstance(addr)
: ServiceInstance.fromStringMap(addr, meta);
Set<ServiceInstance> set = CollectionUtils.computeIfAbsent(
CLUSTER_INSTANCE_MAP, clusterName, value -> ConcurrentHashMap.newKeySet(2));
// replace older entry with same address to avoid duplicates with/without metadata
set.removeIf(si -> si.getAddress().equals(addr));
set.add(instance);
}
break;
case RedisListener.UN_REGISTER:
removeServerAddressByPushEmptyProtection(clusterName, serverAddr);
Expand Down Expand Up @@ -365,26 +393,30 @@ private void updateClusterAddressMap(Jedis jedis, String redisRegistryKey, Strin
scanParams.count(10);
scanParams.match(redisRegistryKey + "_*");
String cursor = ScanParams.SCAN_POINTER_START;
Set<InetSocketAddress> newAddressSet = ConcurrentHashMap.newKeySet(2);
Set<ServiceInstance> currentInstances = ConcurrentHashMap.newKeySet(2);
do {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> instances = scanResult.getResult();
if (instances != null && !instances.isEmpty()) {
// key = registry.redis.${cluster}_ip:port
Set<InetSocketAddress> part = instances.stream()
.map(key -> {
String[] split = key.split("_");
return NetUtil.toInetSocketAddress(split[1]);
})
.collect(Collectors.toSet());

newAddressSet.addAll(part);
for (String key : instances) {
String[] split = key.split("_");
if (split.length < 2) {
continue;
}
String serverAddr = split[1];
InetSocketAddress address = NetUtil.toInetSocketAddress(serverAddr);
Map<String, String> meta = jedis.hgetAll(getRedisRegistryMetaKey(serverAddr));
if (meta == null || meta.isEmpty()) {
currentInstances.add(new ServiceInstance(address));
} else {
currentInstances.add(ServiceInstance.fromStringMap(address, meta));
}
}
}
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));

Set<ServiceInstance> currentInstances = ServiceInstance.convertToServiceInstanceSet(newAddressSet);

if (CollectionUtils.isNotEmpty(currentInstances)
&& !currentInstances.equals(CLUSTER_INSTANCE_MAP.get(clusterName))) {
CLUSTER_INSTANCE_MAP.put(clusterName, currentInstances);
Expand All @@ -395,6 +427,11 @@ private String getRedisRegistryKey() {
return REDIS_FILEKEY_PREFIX + clusterName;
}

private String getRedisRegistryMetaKey(String serverAddr) {
// meta key example: registry.redis.${cluster}.meta_ip:port
return REDIS_FILEKEY_PREFIX + clusterName + ".meta_" + serverAddr;
}

private String getRedisAddrFileKey() {
return REDIS_FILEKEY_PREFIX + PRO_SERVER_ADDR_KEY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -66,15 +69,24 @@ public static void init() throws IOException {

@Test
@Order(1)
public void testFlow() {
ServiceInstance serviceInstance = new ServiceInstance(new InetSocketAddress(NetUtil.getLocalIp(), 8091));
public void testRegisterWithMetadataAndLookup() {
Map<String, Object> meta = new HashMap<>();
meta.put("zone", "A");
meta.put("version", "v1");
ServiceInstance serviceInstance = new ServiceInstance(new InetSocketAddress(NetUtil.getLocalIp(), 8092), meta);
redisRegistryService.register(serviceInstance);

Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 0);
List<ServiceInstance> instances = redisRegistryService.lookup("default_tx_group");
ServiceInstance target = instances.stream()
.filter(si -> si.getAddress().getPort() == 8092)
.findFirst()
.orElse(null);
Assertions.assertNotNull(target);
Assertions.assertNotNull(target.getMetadata());
Assertions.assertEquals("A", target.getMetadata().get("zone"));
Assertions.assertEquals("v1", target.getMetadata().get("version"));

redisRegistryService.unregister(serviceInstance);

Assertions.assertTrue(redisRegistryService.lookup("default_tx_group").size() > 0);
}

@Test
Expand Down