diff --git a/java/src/main/java/org/eclipse/ditto/client/DittoClient.java b/java/src/main/java/org/eclipse/ditto/client/DittoClient.java index a3672ee8..32fe576d 100755 --- a/java/src/main/java/org/eclipse/ditto/client/DittoClient.java +++ b/java/src/main/java/org/eclipse/ditto/client/DittoClient.java @@ -52,7 +52,9 @@ public interface DittoClient { * Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection. * * @param dittoProtocolAdaptable the adaptable to send - * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} + * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or + * which failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client is + * in a reconnecting state. * @throws IllegalStateException when no twin/live connection was configured for this client */ CompletionStage sendDittoProtocol(Adaptable dittoProtocolAdaptable); diff --git a/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java b/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java index 930116a8..78f43343 100755 --- a/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java +++ b/java/src/main/java/org/eclipse/ditto/client/configuration/MessagingConfiguration.java @@ -182,6 +182,7 @@ interface Builder { * Register a consumer of errors which occur during opening the connection initially and on reconnects. * * @param handler the handler that will be called with the cause of the connection error. + * @return this builder. * @since 1.2.0 */ Builder connectionErrorHandler(@Nullable Consumer handler); @@ -190,6 +191,7 @@ interface Builder { * Register a contextListener which is notified whenever the connection is disconnected. * * @param contextListener the handler that will be called with details about the disconnection. + * @return this builder. * @since 2.1.0 */ Builder disconnectedListener(@Nullable Consumer contextListener); diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java index 68a89bf8..7c3b1413 100644 --- a/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/AbstractHandle.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.function.Predicate; @@ -40,6 +41,7 @@ import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator; import org.eclipse.ditto.client.internal.bus.Classification; import org.eclipse.ditto.client.management.AcknowledgementsFailedException; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.json.JsonField; import org.eclipse.ditto.json.JsonObject; @@ -138,6 +140,8 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) { * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. + * If the client is reconnecting while this method is called the future fails with a + * {@link ClientReconnectingException}. */ protected , S extends PolicyCommandResponse, R> CompletionStage askPolicyCommand( final T command, @@ -159,11 +163,14 @@ protected , S extends PolicyCommandResponse, R> Co * @param type of the result. * @return future of the result if the expected response arrives or a failed future on error. * Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention. + * If the client is reconnecting while this method is called the future fails with a + * {@link ClientReconnectingException}. */ protected , S extends CommandResponse, R> CompletionStage askThingCommand( final T command, final Class expectedResponse, final Function onSuccess) { + final ThingCommand commandWithChannel = validateAckRequests(setChannel(command, channel)); return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class, ErrorResponse::getDittoRuntimeException); @@ -180,7 +187,8 @@ protected , S extends CommandResponse, R> Completio * @param type of the expected success response. * @param type of the expected error response. * @param type of the result. - * @return future of the result. + * @return future of the result. The future can be exceptional with a {@link ClientReconnectingException} if the + * client is reconnecting while this method is called. */ protected CompletionStage sendSignalAndExpectResponse(final Signal signal, final Class expectedResponseClass, @@ -188,25 +196,33 @@ protected CompletionStage sendSignalAndExpectResponse(final Signal< final Class expectedErrorResponseClass, final Function onError) { - final CompletionStage responseFuture = messagingProvider.getAdaptableBus() - .subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout()); - - messagingProvider.emit(signalToJsonString(signal)); - return responseFuture.thenApply(responseAdaptable -> { - final Signal response = signalFromAdaptable(responseAdaptable); - if (expectedErrorResponseClass.isInstance(response)) { - // extracted runtime exception will be wrapped in CompletionException. - throw onError.apply(expectedErrorResponseClass.cast(response)); - } else if (response instanceof Acknowledgements) { - final CommandResponse commandResponse = - extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response); - return onSuccess.apply(expectedResponseClass.cast(commandResponse)); - } else if (expectedResponseClass.isInstance(response)) { - return onSuccess.apply(expectedResponseClass.cast(response)); - } else { - throw new ClassCastException("Expect " + expectedResponseClass.getSimpleName() + ", got: " + response); - } - }); + try { + final CompletionStage responseFuture = messagingProvider.getAdaptableBus() + .subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout()); + + messagingProvider.emit(signalToJsonString(signal)); + return responseFuture.thenApply(responseAdaptable -> { + final Signal response = signalFromAdaptable(responseAdaptable); + if (expectedErrorResponseClass.isInstance(response)) { + // extracted runtime exception will be wrapped in CompletionException. + throw onError.apply(expectedErrorResponseClass.cast(response)); + } else if (response instanceof Acknowledgements) { + final CommandResponse commandResponse = + extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response); + return onSuccess.apply(expectedResponseClass.cast(commandResponse)); + } else if (expectedResponseClass.isInstance(response)) { + return onSuccess.apply(expectedResponseClass.cast(response)); + } else { + throw new ClassCastException( + "Expect " + expectedResponseClass.getSimpleName() + ", got: " + response); + } + }); + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } + } /** diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java b/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java index ad398f1f..e0a61e95 100755 --- a/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/CommonManagementImpl.java @@ -163,7 +163,8 @@ public CompletionStage startConsumption(final Option... consumptionOpti * Starts the consumption of twin events / messages / live events and commands. * * @param consumptionConfig the configuration Map to apply for the consumption. - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails if the client is in + * a reconnecting state */ protected abstract CompletionStage doStartConsumption(Map consumptionConfig); @@ -648,6 +649,7 @@ public void registerForThingChanges(final String registrationId, final Consumer< * @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement * or not. * @return the subscription ID. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. */ protected AdaptableBus.SubscriptionId subscribe( @Nullable final AdaptableBus.SubscriptionId previousSubscriptionId, @@ -730,6 +732,7 @@ private static String appendCorrelationIdParameter(final String protocolCommand, * @param protocolCommandAck the expected acknowledgement. * @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement * or not. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting. */ protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscriptionId, final String protocolCommand, diff --git a/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java b/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java index b22b0a76..ed0ba3dc 100644 --- a/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java +++ b/java/src/main/java/org/eclipse/ditto/client/internal/DefaultDittoClient.java @@ -234,7 +234,6 @@ private static PoliciesImpl configurePolicyClient(final MessagingProvider messag final String busName = TopicPath.Channel.NONE.getName(); final PointerBus bus = BusFactory.createPointerBus(busName, messagingProvider.getExecutorService()); init(bus, messagingProvider); - final MessagingConfiguration messagingConfiguration = messagingProvider.getMessagingConfiguration(); final OutgoingMessageFactory messageFactory = getOutgoingMessageFactoryForPolicies(messagingProvider); return PoliciesImpl.newInstance(messagingProvider, messageFactory, bus); } diff --git a/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java b/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java index f1f1c083..752d26aa 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/LiveCommandProcessor.java @@ -41,6 +41,7 @@ public interface LiveCommandProcessor { * Publish a signal. * * @param signal the signal to publish. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting state. */ void publishLiveSignal(Signal signal); diff --git a/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java b/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java index be5f42bf..3e1d6302 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/commands/LiveCommandHandler.java @@ -34,6 +34,7 @@ public interface LiveCommandHandler, B extends LiveC * @param type the type of live commands. MUST be an interface satisfying the recursive type bound. * @param commandHandler constructor of any response or event to publish. * @param type of live commands. + * @param type of live command answers. * @return the live command handler. */ static , B extends LiveCommandAnswerBuilder> LiveCommandHandler of( @@ -49,6 +50,7 @@ static , B extends LiveCommandAnswerBuilder> LiveCom * @param type the type of live commands. MUST be an interface satisfying the recursive type bound. * @param commandHandler constructor of any response or event to publish and sender of any acknowledgements. * @param type of live commands. + * @param type of live command answers. * @return the live command handler. */ static , B extends LiveCommandAnswerBuilder> LiveCommandHandler withAcks( @@ -77,6 +79,7 @@ static , B extends LiveCommandAnswerBuilder> LiveCom * To be called after runtime type check of the live command. * * @param liveCommand the live command. + * @param signalPublisher the signal publisher. * @return the result of calling the command handler on the command. */ default LiveCommandAnswerBuilder.BuildStep castAndApply(final LiveCommand liveCommand, diff --git a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java index 4bb940cf..5cd6065e 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/events/EventEmitter.java @@ -30,6 +30,7 @@ public interface EventEmitter { * * @param eventFunction Function providing a EventFactory and requiring a Event as result. * @throws NullPointerException if {@code eventFunction} is {@code null}. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting. */ void emitEvent(Function> eventFunction); diff --git a/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java b/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java index d91918db..0fea6ca1 100644 --- a/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/internal/LiveImpl.java @@ -50,6 +50,7 @@ import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry; import org.eclipse.ditto.client.live.messages.PendingMessage; import org.eclipse.ditto.client.live.messages.RepliableMessage; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.json.JsonKey; import org.eclipse.ditto.messages.model.KnownMessageSubjects; @@ -158,44 +159,58 @@ protected CompletionStage doStartConsumption(final Map con CompletableFuture.allOf(completableFutureEvents, completableFutureMessages, completableFutureLiveCommands); - // register message handler which handles live events: - subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - return subscribe(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureEvents - ); - }); - - // register message handler which handles incoming messages: - subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - return subscribeAndPublishMessage(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureMessages, - adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), adaptableAsLiveMessage(adaptable))); - }); - - // register message handler which handles live commands: - subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType, previousSubscriptionId) -> { - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - - return subscribeAndPublishMessage(previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - completableFutureLiveCommands, - adaptable -> bus -> bus.getExecutor().submit(() -> handleLiveCommandOrResponse(adaptable)) - ); - }); - return completableFutureCombined; + try { + // register message handler which handles live events: + subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + return subscribe(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureEvents + ); + }); + + // register message handler which handles incoming messages: + subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + return subscribeAndPublishMessage(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureMessages, + adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), + adaptableAsLiveMessage(adaptable))); + }); + + // register message handler which handles live commands: + subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, + (streamingType, previousSubscriptionId) -> { + final String subscriptionMessage = + buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + + return subscribeAndPublishMessage(previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + completableFutureLiveCommands, + adaptable -> bus -> bus.getExecutor() + .submit(() -> handleLiveCommandOrResponse(adaptable)) + ); + }); + return completableFutureCombined; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } } /* diff --git a/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java b/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java index 986b329b..9dc2f6c5 100755 --- a/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java +++ b/java/src/main/java/org/eclipse/ditto/client/live/messages/MessageSender.java @@ -228,6 +228,8 @@ interface MessageSendable { * by its potential targets.

* * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. */ void send(); @@ -238,6 +240,8 @@ interface MessageSendable { * @param responseConsumer the Consumer which should be notified with the response ot the Throwable in case of * an error. * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. */ default void send(final BiConsumer, Throwable> responseConsumer) { send(ByteBuffer.class, responseConsumer); @@ -253,6 +257,8 @@ default void send(final BiConsumer, Throwable> responseConsu * an error. * @param the type of the response message's payload. * @throws IllegalStateException if the {@code Message} to be sent is in an invalid state. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting + * state. * @since 1.0.0 */ void send(Class responseType, BiConsumer, Throwable> responseConsumer); diff --git a/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java b/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java new file mode 100644 index 00000000..bd538684 --- /dev/null +++ b/java/src/main/java/org/eclipse/ditto/client/management/ClientReconnectingException.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.client.management; + +import javax.annotation.concurrent.Immutable; + +/** + * This exception is thrown in the Ditto client if the client is in a reconnecting state and thus can't send messages + * to the backend. + * + * @since 3.0.0 + */ +@Immutable +public class ClientReconnectingException extends RuntimeException { + + private static final long serialVersionUID = -4578923424099138760L; + + private static final String MESSAGE = "Message could not be sent, because the client is currently " + + "reconnecting."; + + private ClientReconnectingException() { + super(MESSAGE); + } + + public static ClientReconnectingException newInstance() { + return new ClientReconnectingException(); + } + +} \ No newline at end of file diff --git a/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java b/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java index 07904073..0bc6a5fa 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/CommonManagement.java @@ -103,7 +103,9 @@ public interface CommonManagement, F extends FeatureHan /** * Start consuming changes (for {@code twin()} and additionally messages and commands (for {@code live()}. * - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage startConsumption(); @@ -116,14 +118,18 @@ public interface CommonManagement, F extends FeatureHan *
{@code Options.Consumption.namespaces("org.eclipse.ditto.namespace1","org.eclipse.ditto.namespace2");
      * Options.Consumption.filter("gt(attributes/counter,42)");}
      * 
- * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage startConsumption(Option... consumptionOptions); /** * Suspend consuming events from Eclipse Ditto. * - * @return a CompletionStage that terminates when the suspend operation was successful. + * @return a CompletionStage that terminates when the suspend operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ CompletionStage suspendConsumption(); @@ -134,6 +140,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code options} contains an option * that is not allowed for creating a thing. */ @@ -148,6 +156,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty * or if {@code options} contains an option that is not allowed for * creating a thing. @@ -163,6 +173,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific {@link * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier * or if {@code options} contains an option that is not allowed for * creating a thing. @@ -180,6 +192,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -198,6 +212,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing the created Thing object or a specific * @throws IllegalArgumentException if {@code options} contains an option * that is not allowed for creating a thing. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. * @since 1.1.0 */ @@ -212,6 +228,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option * that is not allowed for creating a thing. @@ -230,6 +248,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty, or if {@code initialPolicy} is * {@code null} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.things.model.ThingIdInvalidException if the {@code thingId} was invalid. @@ -248,6 +268,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -267,6 +289,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -285,6 +309,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or empty, or if {@code initialPolicy} is * {@code null} or if {@code options} contains an option that is not allowed for creating a thing. * @throws org.eclipse.ditto.things.model.ThingIdInvalidException if the {@code thingId} was invalid. @@ -303,6 +329,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Thing object or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that * is not allowed for creating a thing. @@ -322,6 +350,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return completable future providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code argument} is {@code null} or if {@code options} contains an option * that is not allowed for merging a thing. * @since 2.0.0 @@ -337,6 +367,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return completable future providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code argument} is {@code null} or if {@code options} contains an option * that is not allowed for merging a thing. * @since 2.0.0 @@ -353,6 +385,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier or if {@code options} * contains an option that is not allowed for putting a thing. * @since 1.0.0 @@ -372,6 +406,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for putting a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -392,6 +428,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that is not allowed for putting * a thing. @@ -413,6 +451,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -434,6 +474,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier, or if * {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -456,6 +498,8 @@ public interface CommonManagement, F extends FeatureHan * @return CompletionStage providing an {@link Optional} containing the created Thing object, in case the Thing * has been created, or an empty Optional, in case the Thing has been updated. Provides a * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"}, or if {@code initialPolicy} is {@code null} or if {@code options} contains an option that is * not allowed for putting a thing. @@ -473,6 +517,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or has no identifier * or if {@code options} contains an option that is not allowed for updating a thing. */ @@ -488,6 +534,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing {@code null} in case of success or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thing} is {@code null} or if it does not contain the field named * {@code "thingId"} or if {@code options} contains an option that is not allowed for updating a thing. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code thing} cannot be parsed to a @@ -503,6 +551,8 @@ public interface CommonManagement, F extends FeatureHan * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingId} is {@code null} or if {@code options} contains an option * that is not allowed for updating a thing. */ @@ -516,6 +566,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds additional identifiers of Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(ThingId thingId, ThingId... thingIds); @@ -529,6 +581,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds additional identifiers of Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(JsonFieldSelector fieldSelector, ThingId thingId, ThingId... thingIds); @@ -540,6 +594,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds the identifiers of the Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if {@code thingIds} is {@code null}. */ CompletionStage> retrieve(Iterable thingIds); @@ -552,6 +608,8 @@ public interface CommonManagement, F extends FeatureHan * @param thingIds the identifiers of the Things to be retrieved. * @return CompletionStage providing the requested Things, an empty list or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * The CompletionStage fails with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if + * the client is in a reconnecting state. * @throws IllegalArgumentException if any argument is {@code null}. */ CompletionStage> retrieve(JsonFieldSelector fieldSelector, Iterable thingIds); diff --git a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java index e5dbe781..f201f138 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/FeatureHandle.java @@ -54,7 +54,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * * @param options options to be applied configuring behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. - * @return CompletionStage + * @return CompletionStage for handling the result of the operation or a specific + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for deleting a * feature. */ @@ -64,7 +65,9 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * Retrieve the {@code Feature} being handled by this {@code FeatureHandle}. * * @return CompletionStage providing the requested Feature object, when completed successfully or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(); @@ -75,6 +78,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving a * feature. @@ -88,7 +93,9 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * @param fieldSelector a field selector object allowing to select a subset of fields on the Feature to be * retrieved. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(JsonFieldSelector fieldSelector); @@ -101,6 +108,8 @@ public interface FeatureHandle extends WithFeatureId, FeaturePropertiesManagemen * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested Feature object, when completed successfully or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving a * feature. diff --git a/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java b/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java index 15b436a9..fba714f9 100755 --- a/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java +++ b/java/src/main/java/org/eclipse/ditto/client/management/ThingHandle.java @@ -74,7 +74,10 @@ public interface ThingHandle * * @param options options to be applied configuring behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. - * @return CompletionStage providing for handling the deletion a specific + * @return CompletionStage for handling the result of the operation or a specific + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for deleting * a thing. * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. @@ -85,7 +88,9 @@ public interface ThingHandle * Retrieve the {@code Thing} object being handled by this {@code ThingHandle}. * * @return CompletionStage providing the requested {@link Thing} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(); @@ -96,6 +101,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -108,7 +115,9 @@ public interface ThingHandle * * @param fieldSelector a field selector object allowing to select a subset of fields on the Thing to be retrieved. * @return CompletionStage providing the requested {@link Thing} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. */ CompletionStage retrieve(JsonFieldSelector fieldSelector); @@ -120,6 +129,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -135,6 +146,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null} or if {@code options} contains an option * that is not allowed for setting a policy ID to a thing. * @since 1.1.0 @@ -149,6 +162,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null} or if {@code options} contains an option * that is not allowed for merging a policy ID to a thing. * @since 2.0.0 @@ -163,6 +178,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code features} is {@code null} or if {@code options} contains an option * that is not allowed for setting features to a thing. */ @@ -175,7 +192,9 @@ public interface ThingHandle * @param options options to be applied configuring behaviour of this method, see {@link * org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of the operation or a specific {@link - * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code features} is {@code null} or if {@code options} contains an option * that is not allowed for merging features to a thing. * @since 2.0.0 @@ -190,6 +209,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of this operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code feature} is {@code null} or if {@code options} contains an option * that is not allowed for putting a feature to a thing. */ @@ -203,6 +224,8 @@ public interface ThingHandle * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of this operation or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code feature} is {@code null} or if {@code options} contains an option * that is not allowed for merging a feature to a thing. * @since 2.0.0 @@ -217,6 +240,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code featureId} is {@code null} or if {@code options} contains an option * that is not allowed for deleting a feature from a thing. */ @@ -229,6 +254,8 @@ public interface ThingHandle * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the deletion or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException or if {@code options} contains an option that is not allowed for deleting * features from a thing. */ diff --git a/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java b/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java index 2072adc0..8460a126 100644 --- a/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java +++ b/java/src/main/java/org/eclipse/ditto/client/messaging/MessagingProvider.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; @@ -21,6 +22,7 @@ import org.eclipse.ditto.client.configuration.MessagingConfiguration; import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.Classification; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.protocol.Adaptable; import org.eclipse.ditto.protocol.ProtocolFactory; @@ -94,6 +96,8 @@ public interface MessagingProvider { * Send a message into the channel provided by this provider. * * @param message the message to emit. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting and thus + * can't emit the message. * @since 1.1.0 */ void emit(String message); @@ -102,6 +106,8 @@ public interface MessagingProvider { * Emit an adaptable message in a fire-and-forget manner. * * @param message the message to emit. + * @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting and thus + * can't emit the message. * @since 1.1.0 */ default void emitAdaptable(Adaptable message) { @@ -112,23 +118,31 @@ default void emitAdaptable(Adaptable message) { * Send Ditto Protocol {@link Adaptable} using the underlying connection and expect a response. * * @param adaptable the adaptable to be sent - * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} + * @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or is + * failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException}, when the client is in a + * reconnecting state. */ default CompletionStage sendAdaptable(final Adaptable adaptable) { - final String correlationId = adaptable.getDittoHeaders() - .getCorrelationId() - .orElseGet(() -> UUID.randomUUID().toString()); - final Adaptable adaptableToSend = adaptable.getDittoHeaders() - .getCorrelationId() - .map(cid -> adaptable) - .orElseGet(() -> adaptable.setDittoHeaders( - adaptable.getDittoHeaders().toBuilder().correlationId(correlationId).build()) - ); - final Duration timeout = getMessagingConfiguration().getTimeout(); - final CompletionStage result = getAdaptableBus() - .subscribeOnceForAdaptable(Classification.forCorrelationId(correlationId), timeout); - emitAdaptable(adaptableToSend); - return result; + try { + final String correlationId = adaptable.getDittoHeaders() + .getCorrelationId() + .orElseGet(() -> UUID.randomUUID().toString()); + final Adaptable adaptableToSend = adaptable.getDittoHeaders() + .getCorrelationId() + .map(cid -> adaptable) + .orElseGet(() -> adaptable.setDittoHeaders( + adaptable.getDittoHeaders().toBuilder().correlationId(correlationId).build()) + ); + final Duration timeout = getMessagingConfiguration().getTimeout(); + final CompletionStage result = getAdaptableBus() + .subscribeOnceForAdaptable(Classification.forCorrelationId(correlationId), timeout); + emitAdaptable(adaptableToSend); + return result; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); + } } /** diff --git a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java index 6bd0243d..f9776c7f 100644 --- a/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java +++ b/java/src/main/java/org/eclipse/ditto/client/messaging/internal/WebSocketMessagingProvider.java @@ -50,6 +50,7 @@ import org.eclipse.ditto.client.internal.VersionReader; import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.BusFactory; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.AuthenticationException; import org.eclipse.ditto.client.messaging.AuthenticationProvider; import org.eclipse.ditto.client.messaging.MessagingException; @@ -297,7 +298,11 @@ private CompletionStage initiateConnection(final WebSocket ws) { @Override public void emit(final String message) { - sendToWebsocket(message); + if (reconnecting.get()) { + throw ClientReconnectingException.newInstance(); + } else { + sendToWebsocket(message); + } } private void sendToWebsocket(final String stringMessage) { diff --git a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java index a6abd469..20ceca16 100644 --- a/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java +++ b/java/src/main/java/org/eclipse/ditto/client/policies/Policies.java @@ -52,7 +52,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Policy object or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. * @throws org.eclipse.ditto.policies.model.PolicyIdInvalidException if the {@code policyId} was invalid. */ @@ -69,7 +71,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the created Policy object or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -89,6 +93,8 @@ public interface Policies { * @return CompletionStage providing an {@link java.util.Optional} containing the created Policy object, * in case the Policy has been created, or an empty Optional, in case the Policy has been updated. * Provides a {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. */ CompletionStage> put(Policy policy, Option... options); @@ -106,6 +112,8 @@ public interface Policies { * @return CompletionStage providing an {@link Optional} containing the created Policy object, in case the Policy * has been created, or an empty Optional, in case the Policy has been updated. * Provides a {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -121,6 +129,8 @@ public interface Policies { * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing for handling a successful update or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policy} is {@code null} or has no identifier. */ CompletionStage update(Policy policy, Option... options); @@ -135,6 +145,8 @@ public interface Policies { * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing for handling a successful update or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code jsonObject} is {@code null} or if it does not contain the field named * {@code "policyId"}. * @throws org.eclipse.ditto.base.model.exceptions.DittoJsonException if {@code jsonObject} cannot be parsed to a @@ -149,7 +161,9 @@ public interface Policies { * @param options options to be applied configuring behaviour of this method, * see {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage for handling the result of deletion or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null}. */ CompletionStage delete(PolicyId policyId, Option... options); @@ -159,7 +173,9 @@ public interface Policies { * * @param policyId the identifier of the Policy to be retrieved. * @return CompletionStage providing the requested Policy or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws IllegalArgumentException if {@code policyId} is {@code null}. */ CompletionStage retrieve(PolicyId policyId); @@ -168,10 +184,13 @@ public interface Policies { /** * Gets the {@code Policy} specified by the given identifier with the given options. * + * @param policyId the policyId to retrieve. * @param options options that determine the behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Thing} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if {@code options} is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a thing. @@ -183,9 +202,12 @@ public interface Policies { * Retrieve the {@code Policy} specified by the given identifier, containing the fields specified by * the given {@code fieldSelector}. * + * @param policyId the policyId to retrieve. * @param fieldSelector a field selector object allowing to select a subset of fields on the Policy to be retrieved. * @return CompletionStage providing the requested {@link Policy} or a specific - * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed + * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @since 2.4.0 */ CompletionStage retrieve(PolicyId policyId, JsonFieldSelector fieldSelector); @@ -194,11 +216,14 @@ public interface Policies { * Gets the {@code Policy} specified by the given identifier with the given options, containing the fields * specified by the given {@code fieldSelector}. * + * @param policyId the policyId to retrieve. * @param fieldSelector a field selector object allowing to select a subset of fields on the Policy to be retrieved. * @param options options that determine the behaviour of this method, see * {@link org.eclipse.ditto.client.options.Options}. * @return CompletionStage providing the requested {@link Policy} or a specific * {@link org.eclipse.ditto.base.model.exceptions.DittoRuntimeException} if the operation failed. + * If the client is reconnecting the CompletionStage fails with a + * {@link org.eclipse.ditto.client.management.ClientReconnectingException}. * @throws NullPointerException if any argument is {@code null}. * @throws IllegalArgumentException if {@code options} contains an option that is not allowed for retrieving * a policy. diff --git a/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java b/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java index 8fe656cf..777e4a30 100755 --- a/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java +++ b/java/src/main/java/org/eclipse/ditto/client/twin/Twin.java @@ -28,7 +28,9 @@ public interface Twin extends CommonManagement{@code Options.Consumption.namespaces("org.eclipse.ditto.namespace1","org.eclipse.ditto.namespace2"); * Options.Consumption.filter("gt(attributes/counter,42)");} * - * @return a CompletionStage that terminates when the start operation was successful. + * @return a CompletionStage that terminates when the start operation was successful or fails with + * {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client + * is in a reconnecting state. */ @Override // overwritten in order to display a better suiting javadoc for the user diff --git a/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java b/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java index 9101970f..815b93c7 100644 --- a/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java +++ b/java/src/main/java/org/eclipse/ditto/client/twin/internal/TwinImpl.java @@ -25,6 +25,7 @@ import org.eclipse.ditto.client.internal.bus.AdaptableBus; import org.eclipse.ditto.client.internal.bus.Classification; import org.eclipse.ditto.client.internal.bus.PointerBus; +import org.eclipse.ditto.client.management.ClientReconnectingException; import org.eclipse.ditto.client.messaging.MessagingProvider; import org.eclipse.ditto.client.twin.Twin; import org.eclipse.ditto.client.twin.TwinFeatureHandle; @@ -94,33 +95,45 @@ public TwinFeatureHandle createFeatureHandle(final ThingId thingId, final String @Override protected CompletionStage doStartConsumption(final Map consumptionConfig) { - final CompletableFuture ackFuture = new CompletableFuture<>(); - final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; - final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); - messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); - synchronized (twinEventSubscription) { - final AdaptableBus.SubscriptionId previousSubscriptionId = twinEventSubscription.get(); - twinEventSubscription.set(subscribe( - previousSubscriptionId, - streamingType, - subscriptionMessage, - streamingType.startAck(), - ackFuture - )); + try { + final CompletableFuture ackFuture = new CompletableFuture<>(); + final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; + final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig); + messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage); + synchronized (twinEventSubscription) { + final AdaptableBus.SubscriptionId previousSubscriptionId = twinEventSubscription.get(); + twinEventSubscription.set(subscribe( + previousSubscriptionId, + streamingType, + subscriptionMessage, + streamingType.startAck(), + ackFuture + )); + } + return ackFuture; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); } - return ackFuture; } @Override public CompletionStage suspendConsumption() { - final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; - messagingProvider.unregisterSubscriptionMessage(streamingType); - final CompletableFuture ackFuture = new CompletableFuture<>(); - synchronized (twinEventSubscription) { - unsubscribe(twinEventSubscription.get(), streamingType.stop(), streamingType.stopAck(), ackFuture); - twinEventSubscription.set(null); + try { + final Classification.StreamingType streamingType = Classification.StreamingType.TWIN_EVENT; + messagingProvider.unregisterSubscriptionMessage(streamingType); + final CompletableFuture ackFuture = new CompletableFuture<>(); + synchronized (twinEventSubscription) { + unsubscribe(twinEventSubscription.get(), streamingType.stop(), streamingType.stopAck(), ackFuture); + twinEventSubscription.set(null); + } + return ackFuture; + } catch (final ClientReconnectingException cre) { + return CompletableFuture.supplyAsync(() -> { + throw cre; + }); } - return ackFuture; } @Override