From 1f1e1f6114bcc4407d3e6475ffcda5561adcd616 Mon Sep 17 00:00:00 2001 From: David Schwilk Date: Thu, 29 Sep 2022 08:02:15 +0200 Subject: [PATCH 1/3] Adds a ClientReconnectingException which is thrown if the client is attempting a reconnect while a message should be sent. Prior the sender of a message only got clues about dropped messages due to reconnect in the clients logs, but couldn't handle this case in code. The ClientReconnectingException can be handled by the sender and an optional buffering/ retrying strategy can be implemented. In methods returning a CompletionStage the stage will complete exceptionally conatining the error, in other methods the exception is thrown and has to be catched for custom handling. Co-authored-by: Kalin Kostashki Signed-off-by: David Schwilk --- .../org/eclipse/ditto/client/DittoClient.java | 4 +- .../ditto/client/internal/AbstractHandle.java | 56 ++++++++---- .../client/internal/CommonManagementImpl.java | 5 +- .../client/internal/DefaultDittoClient.java | 1 - .../client/live/LiveCommandProcessor.java | 1 + .../client/live/events/EventEmitter.java | 1 + .../ditto/client/live/internal/LiveImpl.java | 91 +++++++++++-------- .../client/live/messages/MessageSender.java | 6 ++ .../ClientReconnectingException.java | 39 ++++++++ .../client/management/CommonManagement.java | 64 ++++++++++++- .../client/management/FeatureHandle.java | 15 ++- .../ditto/client/management/ThingHandle.java | 35 ++++++- .../client/messaging/MessagingProvider.java | 44 ++++++--- .../internal/WebSocketMessagingProvider.java | 7 +- .../ditto/client/policies/Policies.java | 32 ++++++- .../org/eclipse/ditto/client/twin/Twin.java | 8 +- .../ditto/client/twin/internal/TwinImpl.java | 55 ++++++----- 17 files changed, 349 insertions(+), 115 deletions(-) create mode 100644 java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java diff --git a/java/src/main/java/org/eclipse/ditto/client/DittoClient.java b/java/src/main/java/org/eclipse/ditto/client/DittoClient.java index a3672ee8..32fe576d 100755 --- a/java/src/main/java/org/eclipse/ditto/client/DittoClient.java +++ b/java/src/main/java/org/eclipse/ditto/client/DittoClient.java @@ -52,7 +52,9 @@ public interface DittoClient { * Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection. * * @param dittoProtocolAdaptable the adaptable to send - * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} + * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or + * which failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client is + * in a reconnecting state. * @throws IllegalStateException when no twin/live connection was configured for this client */ CompletionStage sendDittoProtocol(Adaptable dittoProtocolAdaptable); diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java index 68a89bf8..9d6621d1 100644 --- a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Predicate; @@ -40,6 +41,7 @@ import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator; import org.eclipse.ditto.client.internal.bus.Classification; import org.eclipse.ditto.client.management.AcknowledgementsFailedException; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.json.JsonField; import org.eclipse.ditto.json.JsonObject; @@ -138,6 +140,8 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) { * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. + * If the client is reconnecting while this method is called the future fails with a a + * {@link ClientReconnectingException}. */ protected , S extends PolicyCommandResponse, R> CompletionStage askPolicyCommand( final T command, @@ -159,11 +163,14 @@ protected , S extends PolicyCommandResponse, R> Co * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. + * If the client is reconnecting while this method is called the future fails with a a + * {@link ClientReconnectingException}. */ protected , S extends CommandResponse, R> CompletionStage askThingCommand( final T command, final Class expectedResponse, final Function onSuccess) { + final ThingCommand commandWithChannel = validateAckRequests(setChannel(command, channel)); return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class, ErrorResponse::getDittoRuntimeException); @@ -180,7 +187,8 @@ protected , S extends CommandResponse, R> Completio * @param type of the expected success response. * @param type of the expected error response. * @param type of the result. - * @return future of the result. + * @return future of the result. The future can be exceptional with a {@link ClientReconnectingException} if the + * client is reconnecting while this method is called. */ protected CompletionStage sendSignalAndExpectResponse(final Signal signal, final Class expectedResponseClass, @@ -188,25 +196,33 @@ protected CompletionStage sendSignalAndExpectResponse(final Signal< final Class expectedErrorResponseClass, final Function onError) { - final CompletionStage responseFuture = messagingProvider.getAdaptableBus() - .subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout()); - - messagingProvider.emit(signalToJsonString(signal)); - return responseFuture.thenApply(responseAdaptable -> { - final Signal response = signalFromAdaptable(responseAdaptable); - if (expectedErrorResponseClass.isInstance(response)) { - // extracted runtime exception will be wrapped in CompletionException. - throw onError.apply(expectedErrorResponseClass.cast(response)); - } else if (response instanceof Acknowledgements) { - final CommandResponse commandResponse = - extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response); - return onSuccess.apply(expectedResponseClass.cast(commandResponse)); - } else if (expectedResponseClass.isInstance(response)) { - return onSuccess.apply(expectedResponseClass.cast(response)); - } else { - throw new ClassCastException("Expect " + expectedResponseClass.getSimpleName() + ", got: " + response); - } - }); + try { + final CompletionStage responseFuture = messagingProvider.getAdaptableBus() + .subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout()); + + messagingProvider.emit(signalToJsonString(signal)); + return responseFuture.thenApply(responseAdaptable -> { + final Signal response = signalFromAdaptable(responseAdaptable); + if (expectedErrorResponseClass.isInstance(response)) { + // extracted runtime exception will be wrapped in CompletionException. + throw onError.apply(expectedErrorResponseClass.cast(response)); + } else if (response instanceof Acknowledgements) { + final CommandResponse commandResponse = + extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response); + return onSuccess.apply(expectedResponseClass.cast(commandResponse)); + } else if (expectedResponseClass.isInstance(response)) { + return onSuccess.apply(expectedResponseClass.cast(response)); + } else { + throw new ClassCastException( + "Expect " + expectedResponseClass.getSimpleName() + ", got: " + response); + } + }); + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } + } /** diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java b/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java index ad398f1f..e0a61e95 100755 --- a/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java @@ -163,7 +163,8 @@ public CompletionStage startConsumption(final Option... consumptionOpti * Starts the consumption of twin events / messages / live events and commands. * * @param consumptionConfig the configuration Map to apply for the consumption. - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails if the client is in + * a reconnecting state */ protected abstract CompletionStage doStartConsumption(Map consumptionConfig); @@ -648,6 +649,7 @@ public void registerForThingChanges(final String registrationId, final Consumer< * @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement * or not. * @return the subscription ID. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. */ protected AdaptableBus.SubscriptionId subscribe( @Nullable final AdaptableBus.SubscriptionId previousSubscriptionId, @@ -730,6 +732,7 @@ private static String appendCorrelationIdParameter(final String protocolCommand, * @param protocolCommandAck the expected acknowledgement. * @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement * or not. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. */ protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscriptionId, final String protocolCommand, diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java b/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java index b22b0a76..ed0ba3dc 100644 --- a/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java @@ -234,7 +234,6 @@ private static PoliciesImpl configurePolicyClient(final MessagingProvider messag final String busName = TopicPath.Channel.NONE.getName(); final PointerBus bus = BusFactory.createPointerBus(busName, messagingProvider.getExecutorService()); init(bus, messagingProvider); - final MessagingConfiguration messagingConfiguration = messagingProvider.getMessagingConfiguration(); final OutgoingMessageFactory messageFactory = getOutgoingMessageFactoryForPolicies(messagingProvider); return PoliciesImpl.newInstance(messagingProvider, messageFactory, bus); } diff --git a/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java b/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java index f1f1c083..752d26aa 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java @@ -41,6 +41,7 @@ public interface LiveCommandProcessor { * Publish a signal. * * @param signal the signal to publish. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting state. */ void publishLiveSignal(Signal signal); diff --git a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java index 4bb940cf..faea35a4 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java @@ -30,6 +30,7 @@ public interface EventEmitter { * * @param eventFunction Function providing a EventFactory and requiring a Event as result. * @throws NullPointerException if {@code eventFunction} is {@code null}. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. */ void emitEvent(Function> eventFunction); diff --git a/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java b/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java index d91918db..0fea6ca1 100644 --- a/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java @@ -50,6 +50,7 @@ import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry; import org.eclipse.ditto.client.live.messages.PendingMessage; import org.eclipse.ditto.client.live.messages.RepliableMessage; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.json.JsonKey; import org.eclipse.ditto.messages.model.KnownMessageSubjects; @@ -158,44 +159,58 @@ protected CompletionStage doStartConsumption(final Map con CompletableFuture.allOf(completableFutureEvents, completableFutureMessages, completableFutureLiveCommands); - // register message handler which handles live events: - subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - return subscribe(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureEvents - ); - }); - - // register message handler which handles incoming messages: - subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - return subscribeAndPublishMessage(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureMessages, - adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), adaptableAsLiveMessage(adaptable))); - }); - - // register message handler which handles live commands: - subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - - return subscribeAndPublishMessage(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureLiveCommands, - adaptable -> bus -> bus.getExecutor().submit(() -> handleLiveCommandOrResponse(adaptable)) - ); - }); - return completableFutureCombined; + try { + // register message handler which handles live events: + subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + return subscribe(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureEvents + ); + }); + + // register message handler which handles incoming messages: + subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + return subscribeAndPublishMessage(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureMessages, + adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), + adaptableAsLiveMessage(adaptable))); + }); + + // register message handler which handles live commands: + subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + + return subscribeAndPublishMessage(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureLiveCommands, + adaptable -> bus -> bus.getExecutor() + .submit(() -> handleLiveCommandOrResponse(adaptable)) + ); + }); + return completableFutureCombined; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } } /* diff --git a/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java b/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java index 986b329b..9dc2f6c5 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java @@ -228,6 +228,8 @@ interface MessageSendable { * by its potential targets.

* * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. */ void send(); @@ -238,6 +240,8 @@ interface MessageSendable { * @param responseConsumer the Consumer which should be notified with the response ot the Throwable in case of * an error. * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. */ default void send(final BiConsumer, Throwable> responseConsumer) { send(ByteBuffer.class, responseConsumer); @@ -253,6 +257,8 @@ default void send(final BiConsumer, Throwable> responseConsu * an error. * @param the type of the response message's payload. * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. * @since 1.0.0 */ void send(Class responseType, BiConsumer, Throwable> responseConsumer); diff --git a/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java b/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java new file mode 100644 index 00000000..bd538684 --- /dev/null +++ b/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.client.management; + +import javax.annotation.concurrent.Immutable; + +/** + * This exception is thrown in the Ditto client if the client is in a reconnecting state and thus can't send messages + * to the backend. + * + * @since 3.0.0 + */ +@Immutable +public class ClientReconnectingException extends RuntimeException { + + private static final long serialVersionUID = -4578923424099138760L; + + private static final String MESSAGE = "Message could not be sent, because the client is currently " + + "reconnecting."; + + private ClientReconnectingException() { + super(MESSAGE); + } + + public static ClientReconnectingException newInstance() { + return new ClientReconnectingException(); + } + +} \ No newline at end of file diff --git a/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java b/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java index 07904073..0bc6a5fa 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java @@ -103,7 +103,9 @@ public interface CommonManagement, F extends FeatureHan /** * Start consuming changes (for {@code twin()} and additionally messages and commands (for {@code live()}. * - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage startConsumption(); @@ -116,14 +118,18 @@ public interface CommonManagement, F extends FeatureHan *
{@code Options.Consumption.namespaces("org.eclipse.ditto.namespace1","org.eclipse.ditto.namespace2");
      * Options.Consumption.filter("gt(attributes/counter,42)");}
      * 
- * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage startConsumption(Option... consumptionOptions); /** * Suspend consuming events from Eclipse Ditto. * - * @return a CompletionStage that terminates when the suspend operation was successful. + * @return a CompletionStage that terminates when the suspend operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage suspendConsumption(); @@ -134,6 +140,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code options} contains an option * that is not allowed for creating a thing. */ @@ -148,6 +156,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty * or if {@code options} contains an option that is not allowed for * creating a thing. @@ -163,6 +173,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific {@link * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier * or if {@code options} contains an option that is not allowed for * creating a thing. @@ -180,6 +192,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -198,6 +212,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing the created Thing object or a specific * @throws IllegalArgumentException if {@code options} contains an option * that is not allowed for creating a thing. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. * @since 1.1.0 */ @@ -212,6 +228,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option * that is not allowed for creating a thing. @@ -230,6 +248,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty, or if {@code initialPolicy} is * {@code null} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.things.model.ThingIdInvalidException if the {@code thingId} was invalid. @@ -248,6 +268,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -267,6 +289,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -285,6 +309,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty, or if {@code initialPolicy} is * {@code null} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.things.model.ThingIdInvalidException if the {@code thingId} was invalid. @@ -303,6 +329,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -322,6 +350,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return completable future providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code argument} is {@code null} or if {@code options} contains an option * that is not allowed for merging a thing. * @since 2.0.0 @@ -337,6 +367,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return completable future providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code argument} is {@code null} or if {@code options} contains an option * that is not allowed for merging a thing. * @since 2.0.0 @@ -353,6 +385,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier or if {@code options} * contains an option that is not allowed for putting a thing. * @since 1.0.0 @@ -372,6 +406,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for putting a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -392,6 +428,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that is not allowed for putting * a thing. @@ -413,6 +451,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -434,6 +474,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -456,6 +498,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -473,6 +517,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier * or if {@code options} contains an option that is not allowed for updating a thing. */ @@ -488,6 +534,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for updating a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -503,6 +551,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or if {@code options} contains an option * that is not allowed for updating a thing. */ @@ -516,6 +566,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds additional identifiers of Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(ThingId thingId, ThingId... thingIds); @@ -529,6 +581,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds additional identifiers of Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(JsonFieldSelector fieldSelector, ThingId thingId, ThingId... thingIds); @@ -540,6 +594,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds the identifiers of the Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingIds} is {@code null}. */ CompletionStage> retrieve(Iterable thingIds); @@ -552,6 +608,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds the identifiers of the Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(JsonFieldSelector fieldSelector, Iterable thingIds); diff --git a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java index e5dbe781..123379a9 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java @@ -54,7 +54,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * * @param options options to be applied configuring behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. - * @return CompletionStage + * @return CompletionStage for handling the result of the operation or a specific + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for deleting a * feature. */ @@ -64,7 +65,9 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * Retrieve the {@code Feature} being handled by this {@code FeatureHandle}. * * @return CompletionStage providing the requested Feature object, when completed successfully or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * @link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(); @@ -75,6 +78,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving a * feature. @@ -88,7 +93,9 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * @param fieldSelector a field selector object allowing to select a subset of fields on the Feature to be * retrieved. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(JsonFieldSelector fieldSelector); @@ -101,6 +108,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving a * feature. diff --git a/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java b/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java index 15b436a9..fba714f9 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java @@ -74,7 +74,10 @@ public interface ThingHandle * * @param options options to be applied configuring behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. - * @return CompletionStage providing for handling the deletion a specific + * @return CompletionStage for handling the result of the operation or a specific + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for deleting * a thing. * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. @@ -85,7 +88,9 @@ public interface ThingHandle * Retrieve the {@code Thing} object being handled by this {@code ThingHandle}. * * @return CompletionStage providing the requested {@link Thing} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(); @@ -96,6 +101,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -108,7 +115,9 @@ public interface ThingHandle * * @param fieldSelector a field selector object allowing to select a subset of fields on the Thing to be retrieved. * @return CompletionStage providing the requested {@link Thing} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(JsonFieldSelector fieldSelector); @@ -120,6 +129,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -135,6 +146,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null} or if {@code options} contains an option * that is not allowed for setting a policy ID to a thing. * @since 1.1.0 @@ -149,6 +162,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null} or if {@code options} contains an option * that is not allowed for merging a policy ID to a thing. * @since 2.0.0 @@ -163,6 +178,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code features} is {@code null} or if {@code options} contains an option * that is not allowed for setting features to a thing. */ @@ -175,7 +192,9 @@ public interface ThingHandle * @param options options to be applied configuring behaviour of this method, see {@link * org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific {@link - * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code features} is {@code null} or if {@code options} contains an option * that is not allowed for merging features to a thing. * @since 2.0.0 @@ -190,6 +209,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of this operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code feature} is {@code null} or if {@code options} contains an option * that is not allowed for putting a feature to a thing. */ @@ -203,6 +224,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of this operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code feature} is {@code null} or if {@code options} contains an option * that is not allowed for merging a feature to a thing. * @since 2.0.0 @@ -217,6 +240,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code featureId} is {@code null} or if {@code options} contains an option * that is not allowed for deleting a feature from a thing. */ @@ -229,6 +254,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException or if {@code options} contains an option that is not allowed for deleting * features from a thing. */ diff --git a/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java b/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java index 2072adc0..8460a126 100644 --- a/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java +++ b/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; @@ -21,6 +22,7 @@ import org.eclipse.ditto.client.configuration.MessagingConfiguration; import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.Classification; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.protocol.Adaptable; import org.eclipse.ditto.protocol.ProtocolFactory; @@ -94,6 +96,8 @@ public interface MessagingProvider { * Send a message into the channel provided by this provider. * * @param message the message to emit. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting and thus + * can't emit the message. * @since 1.1.0 */ void emit(String message); @@ -102,6 +106,8 @@ public interface MessagingProvider { * Emit an adaptable message in a fire-and-forget manner. * * @param message the message to emit. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting and thus + * can't emit the message. * @since 1.1.0 */ default void emitAdaptable(Adaptable message) { @@ -112,23 +118,31 @@ default void emitAdaptable(Adaptable message) { * Send Ditto Protocol {@link Adaptable} using the underlying connection and expect a response. * * @param adaptable the adaptable to be sent - * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} + * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or is + * failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException}, when the client is in a + * reconnecting state. */ default CompletionStage sendAdaptable(final Adaptable adaptable) { - final String correlationId = adaptable.getDittoHeaders() - .getCorrelationId() - .orElseGet(() -> UUID.randomUUID().toString()); - final Adaptable adaptableToSend = adaptable.getDittoHeaders() - .getCorrelationId() - .map(cid -> adaptable) - .orElseGet(() -> adaptable.setDittoHeaders( - adaptable.getDittoHeaders().toBuilder().correlationId(correlationId).build()) - ); - final Duration timeout = getMessagingConfiguration().getTimeout(); - final CompletionStage result = getAdaptableBus() - .subscribeOnceForAdaptable(Classification.forCorrelationId(correlationId), timeout); - emitAdaptable(adaptableToSend); - return result; + try { + final String correlationId = adaptable.getDittoHeaders() + .getCorrelationId() + .orElseGet(() -> UUID.randomUUID().toString()); + final Adaptable adaptableToSend = adaptable.getDittoHeaders() + .getCorrelationId() + .map(cid -> adaptable) + .orElseGet(() -> adaptable.setDittoHeaders( + adaptable.getDittoHeaders().toBuilder().correlationId(correlationId).build()) + ); + final Duration timeout = getMessagingConfiguration().getTimeout(); + final CompletionStage result = getAdaptableBus() + .subscribeOnceForAdaptable(Classification.forCorrelationId(correlationId), timeout); + emitAdaptable(adaptableToSend); + return result; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } } /** diff --git a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java index 6bd0243d..f9776c7f 100644 --- a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java +++ b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java @@ -50,6 +50,7 @@ import org.eclipse.ditto.client.internal.VersionReader; import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.BusFactory; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.AuthenticationException; import org.eclipse.ditto.client.messaging.AuthenticationProvider; import org.eclipse.ditto.client.messaging.MessagingException; @@ -297,7 +298,11 @@ private CompletionStage initiateConnection(final WebSocket ws) { @Override public void emit(final String message) { - sendToWebsocket(message); + if (reconnecting.get()) { + throw ClientReconnectingException.newInstance(); + } else { + sendToWebsocket(message); + } } private void sendToWebsocket(final String stringMessage) { diff --git a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java index a6abd469..db65ed96 100644 --- a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java +++ b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java @@ -52,7 +52,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Policy object or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. * @throws org.eclipse.ditto.policies.model.PolicyIdInvalidException if the {@code policyId} was invalid. */ @@ -69,7 +71,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Policy object or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -89,6 +93,8 @@ public interface Policies { * @return CompletionStage providing an {@link java.util.Optional} containing the created Policy object, * in case the Policy has been created, or an empty Optional, in case the Policy has been updated. * Provides a {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. */ CompletionStage> put(Policy policy, Option... options); @@ -106,6 +112,8 @@ public interface Policies { * @return CompletionStage providing an {@link Optional} containing the created Policy object, in case the Policy * has been created, or an empty Optional, in case the Policy has been updated. * Provides a {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -121,6 +129,8 @@ public interface Policies { * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing for handling a successful update or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. */ CompletionStage update(Policy policy, Option... options); @@ -135,6 +145,8 @@ public interface Policies { * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing for handling a successful update or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -149,7 +161,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of deletion or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null}. */ CompletionStage delete(PolicyId policyId, Option... options); @@ -159,7 +173,9 @@ public interface Policies { * * @param policyId the identifier of the Policy to be retrieved. * @return CompletionStage providing the requested Policy or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null}. */ CompletionStage retrieve(PolicyId policyId); @@ -172,6 +188,8 @@ public interface Policies { * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -185,7 +203,9 @@ public interface Policies { * * @param fieldSelector a field selector object allowing to select a subset of fields on the Policy to be retrieved. * @return CompletionStage providing the requested {@link Policy} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @since 2.4.0 */ CompletionStage retrieve(PolicyId policyId, JsonFieldSelector fieldSelector); @@ -199,6 +219,8 @@ public interface Policies { * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Policy} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a policy. diff --git a/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java b/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java index 8fe656cf..777e4a30 100755 --- a/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java +++ b/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java @@ -28,7 +28,9 @@ public interface Twin extends CommonManagement{@code Options.Consumption.namespaces("org.eclipse.ditto.namespace1","org.eclipse.ditto.namespace2"); * Options.Consumption.filter("gt(attributes/counter,42)");} * - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ @Override // overwritten in order to display a better suiting javadoc for the user diff --git a/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java b/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java index 9101970f..815b93c7 100644 --- a/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java @@ -25,6 +25,7 @@ import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.Classification; import org.eclipse.ditto.client.internal.bus.PointerBus; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.client.twin.Twin; import org.eclipse.ditto.client.twin.TwinFeatureHandle; @@ -94,33 +95,45 @@ public TwinFeatureHandle createFeatureHandle(final ThingId thingId, final String @Override protected CompletionStage doStartConsumption(final Map consumptionConfig) { - final CompletableFuture ackFuture = new CompletableFuture<>(); - final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - synchronized (twinEventSubscription) { - final AdaptableBus.SubscriptionId previousSubscriptionId = twinEventSubscription.get(); - twinEventSubscription.set(subscribe( - previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - ackFuture - )); + try { + final CompletableFuture ackFuture = new CompletableFuture<>(); + final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; + final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + synchronized (twinEventSubscription) { + final AdaptableBus.SubscriptionId previousSubscriptionId = twinEventSubscription.get(); + twinEventSubscription.set(subscribe( + previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + ackFuture + )); + } + return ackFuture; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); } - return ackFuture; } @Override public CompletionStage suspendConsumption() { - final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; - messagingProvider.unregisterSubscriptionMessage(streamingType); - final CompletableFuture ackFuture = new CompletableFuture<>(); - synchronized (twinEventSubscription) { - unsubscribe(twinEventSubscription.get(), streamingType.stop(), streamingType.stopAck(), ackFuture); - twinEventSubscription.set(null); + try { + final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; + messagingProvider.unregisterSubscriptionMessage(streamingType); + final CompletableFuture ackFuture = new CompletableFuture<>(); + synchronized (twinEventSubscription) { + unsubscribe(twinEventSubscription.get(), streamingType.stop(), streamingType.stopAck(), ackFuture); + twinEventSubscription.set(null); + } + return ackFuture; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); } - return ackFuture; } @Override From 9f30396da7ae7066150f2c4ce5508184089ddef0 Mon Sep 17 00:00:00 2001 From: Kalin Kostashki Date: Fri, 30 Sep 2022 11:39:35 +0300 Subject: [PATCH 2/3] small typo fix Signed-off-by: Kalin Kostashki --- .../org/eclipse/ditto/client/internal/AbstractHandle.java | 4 ++-- .../org/eclipse/ditto/client/live/events/EventEmitter.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java index 9d6621d1..7c3b1413 100644 --- a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java @@ -140,7 +140,7 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) { * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. - * If the client is reconnecting while this method is called the future fails with a a + * If the client is reconnecting while this method is called the future fails with a * {@link ClientReconnectingException}. */ protected , S extends PolicyCommandResponse, R> CompletionStage askPolicyCommand( @@ -163,7 +163,7 @@ protected , S extends PolicyCommandResponse, R> Co * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. - * If the client is reconnecting while this method is called the future fails with a a + * If the client is reconnecting while this method is called the future fails with a * {@link ClientReconnectingException}. */ protected , S extends CommandResponse, R> CompletionStage askThingCommand( diff --git a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java index faea35a4..5cd6065e 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java @@ -30,7 +30,7 @@ public interface EventEmitter { * * @param eventFunction Function providing a EventFactory and requiring a Event as result. * @throws NullPointerException if {@code eventFunction} is {@code null}. - * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting. */ void emitEvent(Function> eventFunction); From 69bccb613a1835612d90a38256e1a8563c4c8d18 Mon Sep 17 00:00:00 2001 From: David Schwilk Date: Fri, 30 Sep 2022 12:25:09 +0200 Subject: [PATCH 3/3] Fix JavaDoc errors Co-authored-by: Kalin Kostashki Signed-off-by: David Schwilk --- .../ditto/client/configuration/MessagingConfiguration.java | 2 ++ .../eclipse/ditto/client/live/commands/LiveCommandHandler.java | 3 +++ .../org/eclipse/ditto/client/management/FeatureHandle.java | 2 +- .../main/java/org/eclipse/ditto/client/policies/Policies.java | 3 +++ 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java b/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java index 930116a8..78f43343 100755 --- a/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java +++ b/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java @@ -182,6 +182,7 @@ interface Builder { * Register a consumer of errors which occur during opening the connection initially and on reconnects. * * @param handler the handler that will be called with the cause of the connection error. + * @return this builder. * @since 1.2.0 */ Builder connectionErrorHandler(@Nullable Consumer handler); @@ -190,6 +191,7 @@ interface Builder { * Register a contextListener which is notified whenever the connection is disconnected. * * @param contextListener the handler that will be called with details about the disconnection. + * @return this builder. * @since 2.1.0 */ Builder disconnectedListener(@Nullable Consumer contextListener); diff --git a/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java b/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java index be5f42bf..3e1d6302 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java @@ -34,6 +34,7 @@ public interface LiveCommandHandler, B extends LiveC * @param type the type of live commands. MUST be an interface satisfying the recursive type bound. * @param commandHandler constructor of any response or event to publish. * @param type of live commands. + * @param type of live command answers. * @return the live command handler. */ static , B extends LiveCommandAnswerBuilder> LiveCommandHandler of( @@ -49,6 +50,7 @@ static , B extends LiveCommandAnswerBuilder> LiveCom * @param type the type of live commands. MUST be an interface satisfying the recursive type bound. * @param commandHandler constructor of any response or event to publish and sender of any acknowledgements. * @param type of live commands. + * @param type of live command answers. * @return the live command handler. */ static , B extends LiveCommandAnswerBuilder> LiveCommandHandler withAcks( @@ -77,6 +79,7 @@ static , B extends LiveCommandAnswerBuilder> LiveCom * To be called after runtime type check of the live command. * * @param liveCommand the live command. + * @param signalPublisher the signal publisher. * @return the result of calling the command handler on the command. */ default LiveCommandAnswerBuilder.BuildStep castAndApply(final LiveCommand liveCommand, diff --git a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java index 123379a9..f201f138 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java @@ -67,7 +67,7 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * @return CompletionStage providing the requested Feature object, when completed successfully or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. * If the client is reconnecting the CompletionStage fails with a - * @link org.eclipse.ditto.client.management.ClientReconnectingException}. + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(); diff --git a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java index db65ed96..20ceca16 100644 --- a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java +++ b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java @@ -184,6 +184,7 @@ public interface Policies { /** * Gets the {@code Policy} specified by the given identifier with the given options. * + * @param policyId the policyId to retrieve. * @param options options that determine the behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific @@ -201,6 +202,7 @@ public interface Policies { * Retrieve the {@code Policy} specified by the given identifier, containing the fields specified by * the given {@code fieldSelector}. * + * @param policyId the policyId to retrieve. * @param fieldSelector a field selector object allowing to select a subset of fields on the Policy to be retrieved. * @return CompletionStage providing the requested {@link Policy} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. @@ -214,6 +216,7 @@ public interface Policies { * Gets the {@code Policy} specified by the given identifier with the given options, containing the fields * specified by the given {@code fieldSelector}. * + * @param policyId the policyId to retrieve. * @param fieldSelector a field selector object allowing to select a subset of fields on the Policy to be retrieved. * @param options options that determine the behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}.