Skip to content

Commit 22fde53

Browse files
author
MDH
committed
Fix ehcache reconnect
1 parent 900433f commit 22fde53

20 files changed

+757
-532
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Terracotta, Inc.
3+
* Copyright IBM Corp. 2024, 2025
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.ehcache.clustered.client.internal.reconnect;
18+
19+
import org.ehcache.CachePersistenceException;
20+
import org.ehcache.clustered.client.config.Timeouts;
21+
import org.ehcache.clustered.client.internal.service.ClusterTierException;
22+
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
23+
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
24+
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
25+
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
26+
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
27+
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
28+
29+
import java.util.concurrent.TimeoutException;
30+
31+
public class FailedReconnectClusterTierClientEntity implements ClusterTierClientEntity {
32+
private final String cacheId;
33+
private final CachePersistenceException failure;
34+
35+
public FailedReconnectClusterTierClientEntity(String cacheId, CachePersistenceException failure) {
36+
this.cacheId = cacheId;
37+
this.failure = failure;
38+
}
39+
40+
public String getCacheId() {
41+
return cacheId;
42+
}
43+
44+
@Override
45+
public Timeouts getTimeouts() {
46+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
47+
}
48+
49+
@Override
50+
public boolean isConnected() {
51+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
52+
}
53+
54+
@Override
55+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierException, TimeoutException {
56+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
57+
}
58+
59+
@Override
60+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
61+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
62+
}
63+
64+
@Override
65+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
66+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
67+
}
68+
69+
@Override
70+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
71+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
72+
}
73+
74+
@Override
75+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
76+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
77+
}
78+
79+
@Override
80+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
81+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
82+
}
83+
84+
@Override
85+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
86+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
87+
}
88+
89+
@Override
90+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
91+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
92+
}
93+
94+
@Override
95+
public void addReconnectListener(ReconnectListener reconnectListener) {
96+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
97+
}
98+
99+
@Override
100+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
101+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
102+
}
103+
104+
@Override
105+
public void close() {
106+
throw new RuntimeException("Cache " + getCacheId() + " failed reconnecting to cluster", failure);
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright Terracotta, Inc.
3+
* Copyright IBM Corp. 2024, 2025
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.ehcache.clustered.client.internal.reconnect;
18+
19+
import org.ehcache.clustered.client.config.Timeouts;
20+
import org.ehcache.clustered.client.internal.service.ClusterTierException;
21+
import org.ehcache.clustered.client.internal.service.ClusterTierValidationException;
22+
import org.ehcache.clustered.client.internal.service.InternalClusterTierValidationException;
23+
import org.ehcache.clustered.client.internal.store.ClusterTierClientEntity;
24+
import org.ehcache.clustered.client.internal.store.ReconnectInProgressException;
25+
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
26+
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
27+
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
28+
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
29+
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
import org.terracotta.exception.ConnectionClosedException;
33+
import org.terracotta.exception.ConnectionShutdownException;
34+
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.CopyOnWriteArrayList;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.TimeoutException;
42+
import java.util.concurrent.atomic.AtomicReference;
43+
44+
import static org.ehcache.core.util.ExceptionUtil.containsCause;
45+
46+
public class ReconnectableClusterTierClientEntity implements ClusterTierClientEntity {
47+
private static final Logger LOGGER = LoggerFactory.getLogger(ReconnectableClusterTierClientEntity.class);
48+
49+
private final AtomicReference<ClusterTierClientEntity> delegateRef = new AtomicReference<>();
50+
private final Runnable reconnectTask;
51+
52+
private final Map<Class<? extends EhcacheEntityResponse>, List<ResponseListener<? extends EhcacheEntityResponse>>> responseListeners =
53+
new ConcurrentHashMap<>();
54+
private final List<DisconnectionListener> disconnectionListeners = new CopyOnWriteArrayList<>();
55+
private final List<ReconnectListener> reconnectListeners = new CopyOnWriteArrayList<>();
56+
private final ExecutorService asyncExecutor;
57+
private volatile boolean enableEventing = false;
58+
59+
public ReconnectableClusterTierClientEntity(ClusterTierClientEntity clientEntity, Runnable reconnectTask, ExecutorService asyncExecutor) {
60+
delegateRef.set(clientEntity);
61+
this.reconnectTask = reconnectTask;
62+
this.asyncExecutor = asyncExecutor;
63+
}
64+
65+
private ClusterTierClientEntity delegate() {
66+
return delegateRef.get();
67+
}
68+
69+
@SuppressWarnings("unchecked")
70+
public void setDelegateRef(ClusterTierClientEntity clientEntity) {
71+
responseListeners.forEach((k, v) -> {
72+
v.forEach(resp -> clientEntity.addResponseListener(k, (ClusterTierClientEntity.ResponseListener)resp));
73+
});
74+
disconnectionListeners.forEach(clientEntity::addDisconnectionListener);
75+
reconnectListeners.forEach(clientEntity::addReconnectListener);
76+
delegateRef.set(clientEntity);
77+
}
78+
79+
public boolean enableEventing() {
80+
return enableEventing;
81+
}
82+
83+
@Override
84+
public Timeouts getTimeouts() {
85+
return delegate().getTimeouts();
86+
}
87+
88+
@Override
89+
public boolean isConnected() {
90+
return delegate().isConnected();
91+
}
92+
93+
@Override
94+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierException, TimeoutException {
95+
try {
96+
onReconnect(clientEntity -> {
97+
try {
98+
clientEntity.validate(clientStoreConfiguration);
99+
} catch (ClusterTierException e) {
100+
throw new InternalClusterTierValidationException(e.getMessage(), e.getCause());
101+
}
102+
return null;
103+
});
104+
} catch (InternalClusterTierValidationException e) {
105+
throw new ClusterTierValidationException(e.getMessage(), e.getCause());
106+
} catch (ClusterException e) {
107+
throw new RuntimeException(e);
108+
}
109+
}
110+
111+
@Override
112+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
113+
onReconnect(clientEntity -> {
114+
clientEntity.invokeAndWaitForSend(message, track);
115+
return null;
116+
});
117+
}
118+
119+
@Override
120+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
121+
onReconnect(clientEntity -> {
122+
clientEntity.invokeAndWaitForReceive(message, track);
123+
return null;
124+
});
125+
}
126+
127+
@Override
128+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
129+
return onReconnect(clientEntity -> clientEntity.invokeAndWaitForComplete(message, track));
130+
}
131+
132+
@Override
133+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
134+
return onReconnect(clientEntity -> clientEntity.invokeAndWaitForRetired(message, track));
135+
}
136+
137+
@Override
138+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
139+
return onReconnect(clientEntity -> clientEntity.invokeStateRepositoryOperation(message, track));
140+
}
141+
142+
@Override
143+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
144+
delegate().addResponseListener(responseType, responseListener);
145+
146+
responseListeners.compute(responseType, (k, v) -> {
147+
if(v == null) {
148+
v = new CopyOnWriteArrayList<>();
149+
}
150+
v.add(responseListener);
151+
return v;
152+
});
153+
}
154+
155+
@Override
156+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
157+
delegate().addDisconnectionListener(disconnectionListener);
158+
disconnectionListeners.add(disconnectionListener);
159+
}
160+
161+
@Override
162+
public void addReconnectListener(ReconnectListener reconnectListener) {
163+
delegate().addReconnectListener(reconnectListener);
164+
reconnectListeners.add(reconnectListener);
165+
}
166+
167+
@Override
168+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
169+
onReconnect(clientEntity -> {
170+
clientEntity.enableEvents(enable);
171+
return null;
172+
});
173+
enableEventing = enable;
174+
}
175+
176+
@Override
177+
public void close() {
178+
try {
179+
delegate().close();
180+
} catch (Throwable t) {
181+
if (containsCause(t, ConnectionClosedException.class) || containsCause(t, ConnectionShutdownException.class)) {
182+
LOGGER.debug("Store was already closed, since connection was closed");
183+
} else {
184+
throw t;
185+
}
186+
}
187+
}
188+
189+
private <T> T onReconnect(TimeoutAndClusterExceptionFunction<ClusterTierClientEntity, T> function) throws TimeoutException, ClusterException {
190+
ClusterTierClientEntity cl = delegate();
191+
try {
192+
return function.apply(cl);
193+
} catch (Exception sspe) {
194+
if (containsCause(sspe, ConnectionClosedException.class)) {
195+
if (delegateRef.compareAndSet(cl, new ReconnectInProgressClusterTierClientEntity())) {
196+
CompletableFuture.runAsync(reconnectTask, asyncExecutor);
197+
}
198+
return onReconnect(function);
199+
} else {
200+
throw sspe;
201+
}
202+
}
203+
}
204+
205+
@FunctionalInterface
206+
private interface TimeoutAndClusterExceptionFunction<U, V> {
207+
V apply(U u) throws TimeoutException, ClusterException;
208+
}
209+
210+
private static class ReconnectInProgressClusterTierClientEntity implements ClusterTierClientEntity {
211+
@Override
212+
public Timeouts getTimeouts() {
213+
throw new ReconnectInProgressException();
214+
}
215+
216+
@Override
217+
public boolean isConnected() {
218+
throw new ReconnectInProgressException();
219+
}
220+
221+
@Override
222+
public void validate(ServerStoreConfiguration clientStoreConfiguration) throws ClusterTierException, TimeoutException {
223+
throw new ReconnectInProgressException();
224+
}
225+
226+
@Override
227+
public void invokeAndWaitForSend(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
228+
throw new ReconnectInProgressException();
229+
}
230+
231+
@Override
232+
public void invokeAndWaitForReceive(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
233+
throw new ReconnectInProgressException();
234+
}
235+
236+
@Override
237+
public EhcacheEntityResponse invokeAndWaitForComplete(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
238+
throw new ReconnectInProgressException();
239+
}
240+
241+
@Override
242+
public EhcacheEntityResponse invokeAndWaitForRetired(EhcacheOperationMessage message, boolean track) throws ClusterException, TimeoutException {
243+
throw new ReconnectInProgressException();
244+
}
245+
246+
@Override
247+
public EhcacheEntityResponse invokeStateRepositoryOperation(StateRepositoryOpMessage message, boolean track) throws ClusterException, TimeoutException {
248+
throw new ReconnectInProgressException();
249+
}
250+
251+
@Override
252+
public <T extends EhcacheEntityResponse> void addResponseListener(Class<T> responseType, ResponseListener<T> responseListener) {
253+
throw new ReconnectInProgressException();
254+
}
255+
256+
@Override
257+
public void addDisconnectionListener(DisconnectionListener disconnectionListener) {
258+
throw new ReconnectInProgressException();
259+
}
260+
261+
@Override
262+
public void addReconnectListener(ReconnectListener reconnectListener) {
263+
throw new ReconnectInProgressException();
264+
}
265+
266+
@Override
267+
public void enableEvents(boolean enable) throws ClusterException, TimeoutException {
268+
throw new ReconnectInProgressException();
269+
}
270+
271+
@Override
272+
public void close() {
273+
throw new ReconnectInProgressException();
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)