diff --git a/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java b/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java index a3cb2a7aea..9419790f5d 100644 --- a/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java +++ b/clustered/server/ehcache-entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java @@ -603,21 +603,41 @@ private void invalidateAll(ClientDescriptor originatingClientDescriptor) { clientsToInvalidate.remove(originatingClientDescriptor); } - InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate); - clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); - LOGGER.debug("SERVER: requesting {} client(s) invalidation of all in cache {} (ID {})", clientsToInvalidate.size(), storeIdentifier, invalidationId); - for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { - LOGGER.debug("SERVER: asking client {} to invalidate all from cache {} (ID {})", clientDescriptorThatHasToInvalidate, storeIdentifier, invalidationId); - try { - clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateAll(invalidationId)); - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + + if (clientsToInvalidate.isEmpty()) { + invalidateAllComplete(originatingClientDescriptor, invalidationId); + } else { + InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate); + clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); + for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { + LOGGER.debug("SERVER: asking client {} to invalidate all from cache {} (ID {})", clientDescriptorThatHasToInvalidate, storeIdentifier, invalidationId); + try { + clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateAll(invalidationId)); + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); + } } } + } - if (clientsToInvalidate.isEmpty()) { - clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + private void invalidateAllComplete(ClientDescriptor initiator, int invalidationId) { + try { + if (isStrong()) { + if (initiator != null) { + clientCommunicator.sendNoResponse(initiator, allInvalidationDone()); + LOGGER.debug("SERVER: notifying originating client that all other clients invalidated all in cache {} from {} (ID {})", storeIdentifier, initiator, invalidationId); + } + } else { + entityMessenger.messageSelf(new ClearInvalidationCompleteMessage()); + + InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); + if (invalidationTracker != null) { + invalidationTracker.setClearInProgress(false); + } + } + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); } } @@ -625,43 +645,18 @@ private void clientInvalidated(ClientDescriptor clientDescriptor, int invalidati InvalidationHolder invalidationHolder = clientsWaitingForInvalidation.get(invalidationId); if (invalidationHolder == null) { // Happens when client is re-sending/sending invalidations for which server has lost track since fail-over happened. - LOGGER.debug("Ignoring invalidation from client {} " + clientDescriptor); + LOGGER.debug("Ignoring invalidation from client {} ", clientDescriptor); return; } invalidationHolder.clientsHavingToInvalidate.remove(clientDescriptor); if (invalidationHolder.clientsHavingToInvalidate.isEmpty()) { if (clientsWaitingForInvalidation.remove(invalidationId) != null) { - try { - Long key = invalidationHolder.key; - if (key == null) { - if (isStrong()) { - clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, allInvalidationDone()); - LOGGER.debug("SERVER: notifying originating client that all other clients invalidated all in cache {} from {} (ID {})", storeIdentifier, clientDescriptor, invalidationId); - } else { - entityMessenger.messageSelf(new ClearInvalidationCompleteMessage()); - - InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); - if (invalidationTracker != null) { - invalidationTracker.setClearInProgress(false); - } - - } - } else { - if (isStrong()) { - clientCommunicator.sendNoResponse(invalidationHolder.clientDescriptorWaitingForInvalidation, hashInvalidationDone(key)); - LOGGER.debug("SERVER: notifying originating client that all other clients invalidated key {} in cache {} from {} (ID {})", key, storeIdentifier, clientDescriptor, invalidationId); - } else { - entityMessenger.messageSelf(new InvalidationCompleteMessage(key)); - - InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); - if (invalidationTracker != null) { - invalidationTracker.untrackHashInvalidation(key); - } - } - } - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + Long key = invalidationHolder.key; + if (key == null) { + invalidateAllComplete(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + } else { + invalidateComplete(invalidationHolder.clientDescriptorWaitingForInvalidation, key, invalidationId); } } } @@ -676,21 +671,42 @@ private void invalidateHashForClient(ClientDescriptor originatingClientDescripto clientsToInvalidate.remove(originatingClientDescriptor); } - InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate, key); - clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); - LOGGER.debug("SERVER: requesting {} client(s) invalidation of hash {} in cache {} (ID {})", clientsToInvalidate.size(), key, storeIdentifier, invalidationId); - for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { - LOGGER.debug("SERVER: asking client {} to invalidate hash {} from cache {} (ID {})", clientDescriptorThatHasToInvalidate, key, storeIdentifier, invalidationId); - try { - clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateHash(key, invalidationId)); - } catch (MessageCodecException mce) { - throw new AssertionError("Codec error", mce); + + if (clientsToInvalidate.isEmpty()) { + invalidateComplete(originatingClientDescriptor, key, invalidationId); + } else { + InvalidationHolder invalidationHolder = new InvalidationHolder(originatingClientDescriptor, clientsToInvalidate, key); + clientsWaitingForInvalidation.put(invalidationId, invalidationHolder); + + for (ClientDescriptor clientDescriptorThatHasToInvalidate : clientsToInvalidate) { + LOGGER.debug("SERVER: asking client {} to invalidate hash {} from cache {} (ID {})", clientDescriptorThatHasToInvalidate, key, storeIdentifier, invalidationId); + try { + clientCommunicator.sendNoResponse(clientDescriptorThatHasToInvalidate, clientInvalidateHash(key, invalidationId)); + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); + } } } + } - if (clientsToInvalidate.isEmpty()) { - clientInvalidated(invalidationHolder.clientDescriptorWaitingForInvalidation, invalidationId); + private void invalidateComplete(ClientDescriptor initiator, long key, int invalidationId) { + try { + if (isStrong()) { + if (initiator != null) { + clientCommunicator.sendNoResponse(initiator, hashInvalidationDone(key)); + LOGGER.debug("SERVER: notifying originating client that all other clients invalidated key {} in cache {} from {} (ID {})", key, storeIdentifier, initiator, invalidationId); + } + } else { + entityMessenger.messageSelf(new InvalidationCompleteMessage(key)); + + InvalidationTracker invalidationTracker = stateService.getInvalidationTracker(storeIdentifier); + if (invalidationTracker != null) { + invalidationTracker.untrackHashInvalidation(key); + } + } + } catch (MessageCodecException mce) { + throw new AssertionError("Codec error", mce); } } diff --git a/clustered/server/ehcache-entity/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java b/clustered/server/ehcache-entity/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java index 5362e0e488..8268782907 100644 --- a/clustered/server/ehcache-entity/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java +++ b/clustered/server/ehcache-entity/src/test/java/org/ehcache/clustered/server/store/ClusterTierActiveEntityTest.java @@ -958,6 +958,18 @@ public void testLoadExistingRecoversInflightInvalidationsForEventualCache() thro verify(clientCommunicator, times(10)).sendNoResponse(ArgumentMatchers.eq(client), ArgumentMatchers.isA(EhcacheEntityResponse.ClientInvalidateHash.class)); } + @Test + public void testInvalidationHandlingOnReconnectWindowTimeoutClosure() throws Exception { + ClusterTierActiveEntity activeEntity = new ClusterTierActiveEntity(defaultRegistry, defaultConfiguration, DEFAULT_MAPPER, SYNC_GETS_EXECUTOR); + EhcacheStateServiceImpl ehcacheStateService = defaultRegistry.getStoreManagerService(); + ehcacheStateService.createStore(defaultStoreName, defaultStoreConfiguration, false); //Passive would have done this before failover + + InvalidationTracker invalidationTracker = ehcacheStateService.getInvalidationTracker(defaultStoreName); + invalidationTracker.trackHashInvalidation(1L); + + activeEntity.startReconnect().close(); + } + @Test @SuppressWarnings("unchecked") public void testReplicationMessageAndOriginalServerStoreOpMessageHasSameConcurrency() throws Exception {