Skip to content

Commit 24ec795

Browse files
authored
feat: Add instanceId to distributed lock keys for multi-instance isolation (#40966)
1 parent ad36f76 commit 24ec795

File tree

8 files changed

+129
-31
lines changed

8 files changed

+129
-31
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.appsmith.server.configurations;
2+
3+
import com.appsmith.caching.components.InstanceIdProvider;
4+
import com.appsmith.server.services.ConfigService;
5+
import lombok.RequiredArgsConstructor;
6+
import org.springframework.stereotype.Component;
7+
import reactor.core.publisher.Mono;
8+
9+
@Component
10+
@RequiredArgsConstructor
11+
public class InstanceIdProviderImpl implements InstanceIdProvider {
12+
13+
private final ConfigService configService;
14+
15+
@Override
16+
public Mono<String> getInstanceId() {
17+
return configService.getInstanceId();
18+
}
19+
}

app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package com.appsmith.server.aspect;
22

3+
import com.appsmith.caching.components.InstanceIdProvider;
34
import org.junit.jupiter.api.Test;
45
import org.springframework.beans.factory.annotation.Autowired;
56
import org.springframework.boot.test.context.SpringBootTest;
7+
import org.springframework.boot.test.mock.mockito.MockBean;
68
import org.springframework.data.redis.core.ReactiveRedisOperations;
79
import reactor.core.publisher.Mono;
810
import reactor.test.StepVerifier;
@@ -16,6 +18,7 @@
1618
import static org.junit.jupiter.api.Assertions.assertNull;
1719
import static org.junit.jupiter.api.Assertions.assertThrows;
1820
import static org.junit.jupiter.api.Assertions.assertTrue;
21+
import static org.mockito.Mockito.when;
1922

2023
@SpringBootTest
2124
class DistributedLockAspectTest {
@@ -26,45 +29,61 @@ class DistributedLockAspectTest {
2629
@Autowired
2730
private ReactiveRedisOperations<String, String> redisOperations;
2831

29-
private static final String LOCK_PREFIX = "lock:";
32+
@MockBean
33+
private InstanceIdProvider instanceIdProvider;
34+
35+
private static final String LOCK_PREFIX = "lock";
36+
private static final String TEST_INSTANCE_ID = "test-instance-123";
37+
38+
private String getLockKey(String key) {
39+
return LOCK_PREFIX + ":" + TEST_INSTANCE_ID + ":" + key;
40+
}
3041

3142
@Test
3243
void testMonoOperation() {
44+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
45+
3346
StepVerifier.create(testLockService.monoOperation())
3447
.expectNext("mono-success")
3548
.verifyComplete();
3649

3750
// Verify lock is released
38-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "mono-test"))
51+
StepVerifier.create(redisOperations.hasKey(getLockKey("mono-test")))
3952
.expectNext(false)
4053
.verifyComplete();
4154
}
4255

4356
@Test
4457
void testFluxOperation() {
58+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
59+
4560
StepVerifier.create(testLockService.fluxOperation().collectList())
4661
.expectNext(List.of("flux-success-1", "flux-success-2"))
4762
.verifyComplete();
4863

4964
// Verify lock is released
50-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "flux-test"))
65+
StepVerifier.create(redisOperations.hasKey(getLockKey("flux-test")))
5166
.expectNext(false)
5267
.verifyComplete();
5368
}
5469

5570
@Test
5671
void testBlockingOperation() {
72+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
73+
5774
String result = testLockService.blockingOperation();
5875
assertEquals("blocking-success", result);
5976

6077
// Verify lock is released
61-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "blocking-test"))
78+
StepVerifier.create(redisOperations.hasKey(getLockKey("blocking-test")))
6279
.expectNext(false)
6380
.verifyComplete();
6481
}
6582

6683
@Test
6784
void testConcurrentAccess() throws InterruptedException {
85+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
86+
6887
AtomicReference<String> thread1Result = new AtomicReference<>();
6988
AtomicReference<String> thread2Result = new AtomicReference<>();
7089
CountDownLatch thread1Started = new CountDownLatch(1);
@@ -106,13 +125,15 @@ void testConcurrentAccess() throws InterruptedException {
106125

107126
@Test
108127
void testPersistentLock() {
128+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
129+
109130
// First operation acquires lock and doesn't release it
110131
StepVerifier.create(testLockService.operationWithPersistentLock())
111132
.expectNext("success")
112133
.verifyComplete();
113134

114135
// Verify lock still exists after operation completes
115-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "persistent-lock"))
136+
StepVerifier.create(redisOperations.hasKey(getLockKey("persistent-lock")))
116137
.expectNext(true)
117138
.verifyComplete();
118139

@@ -121,20 +142,22 @@ void testPersistentLock() {
121142
.verifyComplete(); // Completes empty because lock is still held
122143

123144
// Cleanup: Release lock for other tests
124-
StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations))
145+
StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations, TEST_INSTANCE_ID))
125146
.expectNext(1L)
126147
.verifyComplete();
127148
}
128149

129150
@Test
130151
void testPersistentLockExpiration() {
152+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
153+
131154
// Execute operation with short-lived lock
132155
StepVerifier.create(Mono.just(testLockService.operationWithShortLivedLock()))
133156
.expectNext("success")
134157
.verifyComplete();
135158

136159
// Verify lock exists immediately after
137-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock"))
160+
StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock")))
138161
.expectNext(true)
139162
.verifyComplete();
140163

@@ -146,50 +169,56 @@ void testPersistentLockExpiration() {
146169
}
147170

148171
// Verify lock has expired
149-
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock"))
172+
StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock")))
150173
.expectNext(false)
151174
.verifyComplete();
152175
}
153176

154177
@Test
155178
void testLockReleasedOnBlockingError() {
179+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
180+
156181
// Execute operation that throws error
157182
assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError());
158183

159184
// Verify lock is released despite shouldReleaseLock = false
160-
StepVerifier.create(redisOperations.hasKey("lock:error-lock"))
185+
StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock")))
161186
.expectNext(false)
162187
.verifyComplete();
163188
}
164189

165190
@Test
166191
void testLockReleasedOnReactiveError() {
192+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
193+
167194
// Execute operation that returns Mono.error
168195
StepVerifier.create(testLockService.reactiveMethodWithError())
169196
.expectError(RuntimeException.class)
170197
.verify();
171198

172199
// Verify lock is released despite shouldReleaseLock = false
173-
StepVerifier.create(redisOperations.hasKey("lock:error-lock"))
200+
StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock")))
174201
.expectNext(false)
175202
.verifyComplete();
176203
}
177204

178205
@Test
179206
void testLockReleasedOnErrorAllowsSubsequentExecution() {
207+
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
208+
180209
// First call throws error
181210
assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError());
182211

183212
// Verify we can acquire the same lock immediately after error
184213
AtomicBoolean lockAcquired = new AtomicBoolean(false);
185-
StepVerifier.create(redisOperations.opsForValue().setIfAbsent("lock:error-lock", "test-value"))
214+
StepVerifier.create(redisOperations.opsForValue().setIfAbsent(getLockKey("error-lock"), "test-value"))
186215
.consumeNextWith(result -> lockAcquired.set(result))
187216
.verifyComplete();
188217

189218
// Should be able to acquire lock after error
190219
assertTrue(lockAcquired.get());
191220

192221
// Cleanup
193-
redisOperations.delete("lock:error-lock").block();
222+
redisOperations.delete(getLockKey("error-lock")).block();
194223
}
195224
}

app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ public String operationWithShortLivedLock() {
4242
}
4343

4444
// Method to manually release the lock (for testing cleanup)
45-
public Mono<Long> releaseLock(String lockKey, ReactiveRedisOperations<String, String> redisOperations) {
46-
return redisOperations.delete("lock:" + lockKey);
45+
public Mono<Long> releaseLock(
46+
String lockKey, ReactiveRedisOperations<String, String> redisOperations, String instanceId) {
47+
return redisOperations.delete("lock:" + instanceId + ":" + lockKey);
4748
}
4849

4950
@DistributedLock(key = "error-lock", shouldReleaseLock = false)

app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.appsmith.caching.aspects;
22

33
import com.appsmith.caching.annotations.DistributedLock;
4+
import com.appsmith.caching.components.InstanceIdProvider;
45
import lombok.extern.slf4j.Slf4j;
56
import org.aspectj.lang.ProceedingJoinPoint;
67
import org.aspectj.lang.annotation.Around;
@@ -23,11 +24,14 @@
2324
public class DistributedLockAspect {
2425

2526
private final ReactiveRedisOperations<String, String> redisOperations;
27+
private final InstanceIdProvider instanceIdProvider;
2628

27-
private static final String LOCK_PREFIX = "lock:";
29+
private static final String LOCK_PREFIX = "lock";
2830

29-
public DistributedLockAspect(ReactiveRedisOperations<String, String> redisOperations) {
31+
public DistributedLockAspect(
32+
ReactiveRedisOperations<String, String> redisOperations, InstanceIdProvider instanceIdProvider) {
3033
this.redisOperations = redisOperations;
34+
this.instanceIdProvider = instanceIdProvider;
3135
}
3236

3337
// Method to acquire a distributed lock before executing the annotated method.
@@ -57,12 +61,14 @@ private static class LockDetails {
5761
}
5862
}
5963

60-
private LockDetails createLockDetails(DistributedLock lock) {
61-
String lockKey = LOCK_PREFIX + lock.key();
62-
long ttl = lock.ttl();
63-
String value =
64-
"locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString();
65-
return new LockDetails(lockKey, value, Duration.ofSeconds(ttl));
64+
private Mono<LockDetails> createLockDetails(DistributedLock lock) {
65+
return instanceIdProvider.getInstanceId().defaultIfEmpty("unknown").map(instanceId -> {
66+
String lockKey = LOCK_PREFIX + ":" + instanceId + ":" + lock.key();
67+
long ttl = lock.ttl();
68+
String value = "locked until "
69+
+ Instant.now().plus(ttl, ChronoUnit.SECONDS).toString();
70+
return new LockDetails(lockKey, value, Duration.ofSeconds(ttl));
71+
});
6672
}
6773

6874
private void releaseLock(String lockKey) {
@@ -80,9 +86,7 @@ private void releaseLock(String lockKey) {
8086
}
8187

8288
private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) {
83-
LockDetails lockDetails = createLockDetails(lock);
84-
85-
return redisOperations
89+
return createLockDetails(lock).flatMap(lockDetails -> redisOperations
8690
.opsForValue()
8791
.setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration)
8892
.flatMap(acquired -> {
@@ -103,13 +107,11 @@ private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) {
103107
}
104108
log.info("Lock already acquired for: {}", lockDetails.key);
105109
return Mono.empty();
106-
});
110+
}));
107111
}
108112

109113
private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) {
110-
LockDetails lockDetails = createLockDetails(lock);
111-
112-
return redisOperations
114+
return createLockDetails(lock).flatMapMany(lockDetails -> redisOperations
113115
.opsForValue()
114116
.setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration)
115117
.flatMapMany(acquired -> {
@@ -130,11 +132,11 @@ private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) {
130132
}
131133
log.info("Lock already acquired for: {}", lockDetails.key);
132134
return Flux.empty();
133-
});
135+
}));
134136
}
135137

136138
private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable {
137-
LockDetails lockDetails = createLockDetails(lock);
139+
LockDetails lockDetails = createLockDetails(lock).block();
138140

139141
Boolean acquired = null;
140142
try {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.appsmith.caching.components;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
/**
6+
* Interface to provide instanceId for distributed lock keys.
7+
* This allows the reactive-caching module to get instanceId without depending on higher-level modules.
8+
*/
9+
public interface InstanceIdProvider {
10+
11+
/**
12+
* Get the instance ID for this Appsmith instance
13+
* @return Mono containing the instance ID
14+
*/
15+
Mono<String> getInstanceId();
16+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.appsmith.testcaching;
2+
3+
import com.appsmith.caching.components.InstanceIdProvider;
4+
import org.springframework.boot.test.context.TestConfiguration;
5+
import org.springframework.context.annotation.Bean;
6+
7+
@TestConfiguration
8+
public class TestConfig {
9+
10+
@Bean
11+
public InstanceIdProvider instanceIdProvider() {
12+
return new TestInstanceIdProvider();
13+
}
14+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.appsmith.testcaching;
2+
3+
import com.appsmith.caching.components.InstanceIdProvider;
4+
import reactor.core.publisher.Mono;
5+
6+
public class TestInstanceIdProvider implements InstanceIdProvider {
7+
8+
private static final String TEST_INSTANCE_ID = "test-instance-123";
9+
10+
@Override
11+
public Mono<String> getInstanceId() {
12+
return Mono.just(TEST_INSTANCE_ID);
13+
}
14+
}

app/server/reactive-caching/src/test/java/com/appsmith/testcaching/test/TestCachingMethods.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.appsmith.testcaching.test;
22

33
import com.appsmith.caching.components.CacheManager;
4+
import com.appsmith.testcaching.TestConfig;
45
import com.appsmith.testcaching.model.ArgumentModel;
56
import com.appsmith.testcaching.model.TestModel;
67
import com.appsmith.testcaching.service.CacheTestService;
@@ -10,6 +11,7 @@
1011
import org.junit.jupiter.api.TestInstance;
1112
import org.springframework.beans.factory.annotation.Autowired;
1213
import org.springframework.boot.test.context.SpringBootTest;
14+
import org.springframework.test.context.ContextConfiguration;
1315

1416
import java.util.List;
1517

@@ -20,6 +22,7 @@
2022
@SpringBootTest
2123
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2224
@Slf4j
25+
@ContextConfiguration(classes = TestConfig.class)
2326
public class TestCachingMethods {
2427

2528
@Autowired

0 commit comments

Comments
 (0)