Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion java/src/main/java/org/eclipse/ditto/client/DittoClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public interface DittoClient {
* Directly sends a Ditto Protocol {@link Adaptable} message to the established Ditto backend connection.
*
* @param dittoProtocolAdaptable the adaptable to send
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable}
* @return a CompletionStage containing the correlated response to the sent {@code dittoProtocolAdaptable} or
* which failed with a {@link org.eclipse.ditto.client.management.ClientReconnectingException} if the client is
* in a reconnecting state.
* @throws IllegalStateException when no twin/live connection was configured for this client
*/
CompletionStage<Adaptable> sendDittoProtocol(Adaptable dittoProtocolAdaptable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ interface Builder {
* Register a consumer of errors which occur during opening the connection initially and on reconnects.
*
* @param handler the handler that will be called with the cause of the connection error.
* @return this builder.
* @since 1.2.0
*/
Builder connectionErrorHandler(@Nullable Consumer<Throwable> handler);
Expand All @@ -190,6 +191,7 @@ interface Builder {
* Register a contextListener which is notified whenever the connection is disconnected.
*
* @param contextListener the handler that will be called with details about the disconnection.
* @return this builder.
* @since 2.1.0
*/
Builder disconnectedListener(@Nullable Consumer<DisconnectedContext> contextListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
import org.eclipse.ditto.client.management.ClientReconnectingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -138,6 +140,8 @@ protected Signal signalFromAdaptable(final Adaptable adaptable) {
* @param <R> type of the result.
* @return future of the result if the expected response arrives or a failed future on error.
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
* If the client is reconnecting while this method is called the future fails with a
* {@link ClientReconnectingException}.
*/
protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> CompletionStage<R> askPolicyCommand(
final T command,
Expand All @@ -159,11 +163,14 @@ protected <T extends PolicyCommand<T>, S extends PolicyCommandResponse<?>, R> Co
* @param <R> type of the result.
* @return future of the result if the expected response arrives or a failed future on error.
* Type is {@code CompletionStage} to signify that the future will complete or fail without caller intervention.
* If the client is reconnecting while this method is called the future fails with a
* {@link ClientReconnectingException}.
*/
protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> CompletionStage<R> askThingCommand(
final T command,
final Class<S> expectedResponse,
final Function<S, R> onSuccess) {

final ThingCommand<?> commandWithChannel = validateAckRequests(setChannel(command, channel));
return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class,
ErrorResponse::getDittoRuntimeException);
Expand All @@ -180,33 +187,42 @@ protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> Completio
* @param <S> type of the expected success response.
* @param <E> type of the expected error response.
* @param <R> type of the result.
* @return future of the result.
* @return future of the result. The future can be exceptional with a {@link ClientReconnectingException} if the
* client is reconnecting while this method is called.
*/
protected <S, E, R> CompletionStage<R> sendSignalAndExpectResponse(final Signal<?> signal,
final Class<S> expectedResponseClass,
final Function<S, R> onSuccess,
final Class<E> expectedErrorResponseClass,
final Function<E, ? extends RuntimeException> onError) {

final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());

messagingProvider.emit(signalToJsonString(signal));
return responseFuture.thenApply(responseAdaptable -> {
final Signal<?> response = signalFromAdaptable(responseAdaptable);
if (expectedErrorResponseClass.isInstance(response)) {
// extracted runtime exception will be wrapped in CompletionException.
throw onError.apply(expectedErrorResponseClass.cast(response));
} else if (response instanceof Acknowledgements) {
final CommandResponse<?> commandResponse =
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
} else if (expectedResponseClass.isInstance(response)) {
return onSuccess.apply(expectedResponseClass.cast(response));
} else {
throw new ClassCastException("Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
}
});
try {
final CompletionStage<Adaptable> responseFuture = messagingProvider.getAdaptableBus()
.subscribeOnceForAdaptable(Classification.forCorrelationId(signal), getTimeout());

messagingProvider.emit(signalToJsonString(signal));
return responseFuture.thenApply(responseAdaptable -> {
final Signal<?> response = signalFromAdaptable(responseAdaptable);
if (expectedErrorResponseClass.isInstance(response)) {
// extracted runtime exception will be wrapped in CompletionException.
throw onError.apply(expectedErrorResponseClass.cast(response));
} else if (response instanceof Acknowledgements) {
final CommandResponse<?> commandResponse =
extractCommandResponseFromAcknowledgements(signal, (Acknowledgements) response);
return onSuccess.apply(expectedResponseClass.cast(commandResponse));
} else if (expectedResponseClass.isInstance(response)) {
return onSuccess.apply(expectedResponseClass.cast(response));
} else {
throw new ClassCastException(
"Expect " + expectedResponseClass.getSimpleName() + ", got: " + response);
}
});
} catch (final ClientReconnectingException cre) {
return CompletableFuture.supplyAsync(() -> {
throw cre;
});
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ public CompletionStage<Void> startConsumption(final Option<?>... consumptionOpti
* Starts the consumption of twin events / messages / live events and commands.
*
* @param consumptionConfig the configuration Map to apply for the consumption.
* @return a CompletionStage that terminates when the start operation was successful.
* @return a CompletionStage that terminates when the start operation was successful or fails if the client is in
* a reconnecting state
*/
protected abstract CompletionStage<Void> doStartConsumption(Map<String, String> consumptionConfig);

Expand Down Expand Up @@ -648,6 +649,7 @@ public void registerForThingChanges(final String registrationId, final Consumer<
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @return the subscription ID.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
*/
protected AdaptableBus.SubscriptionId subscribe(
@Nullable final AdaptableBus.SubscriptionId previousSubscriptionId,
Expand Down Expand Up @@ -730,6 +732,7 @@ private static String appendCorrelationIdParameter(final String protocolCommand,
* @param protocolCommandAck the expected acknowledgement.
* @param futureToCompleteOrFailAfterAck the future to complete or fail after receiving the expected acknowledgement
* or not.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client client is reconnecting.
*/
protected void unsubscribe(@Nullable final AdaptableBus.SubscriptionId subscriptionId,
final String protocolCommand,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ private static PoliciesImpl configurePolicyClient(final MessagingProvider messag
final String busName = TopicPath.Channel.NONE.getName();
final PointerBus bus = BusFactory.createPointerBus(busName, messagingProvider.getExecutorService());
init(bus, messagingProvider);
final MessagingConfiguration messagingConfiguration = messagingProvider.getMessagingConfiguration();
final OutgoingMessageFactory messageFactory = getOutgoingMessageFactoryForPolicies(messagingProvider);
return PoliciesImpl.newInstance(messagingProvider, messageFactory, bus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public interface LiveCommandProcessor {
* Publish a signal.
*
* @param signal the signal to publish.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting state.
*/
void publishLiveSignal(Signal<?> signal);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface LiveCommandHandler<L extends LiveCommand<L, B>, B extends LiveC
* @param type the type of live commands. MUST be an interface satisfying the recursive type bound.
* @param commandHandler constructor of any response or event to publish.
* @param <L> type of live commands.
* @param <B> type of live command answers.
* @return the live command handler.
*/
static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCommandHandler<L, B> of(
Expand All @@ -49,6 +50,7 @@ static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCom
* @param type the type of live commands. MUST be an interface satisfying the recursive type bound.
* @param commandHandler constructor of any response or event to publish and sender of any acknowledgements.
* @param <L> type of live commands.
* @param <B> type of live command answers.
* @return the live command handler.
*/
static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCommandHandler<L, B> withAcks(
Expand Down Expand Up @@ -77,6 +79,7 @@ static <L extends LiveCommand<L, B>, B extends LiveCommandAnswerBuilder> LiveCom
* To be called after runtime type check of the live command.
*
* @param liveCommand the live command.
* @param signalPublisher the signal publisher.
* @return the result of calling the command handler on the command.
*/
default LiveCommandAnswerBuilder.BuildStep castAndApply(final LiveCommand<?, ?> liveCommand,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public interface EventEmitter<F extends EventFactory> {
*
* @param eventFunction Function providing a EventFactory and requiring a Event as result.
* @throws NullPointerException if {@code eventFunction} is {@code null}.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is reconnecting.
*/
void emitEvent(Function<F, Event<?>> eventFunction);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.live.messages.PendingMessage;
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.ClientReconnectingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
Expand Down Expand Up @@ -158,44 +159,58 @@ protected CompletionStage<Void> doStartConsumption(final Map<String, String> con
CompletableFuture.allOf(completableFutureEvents, completableFutureMessages,
completableFutureLiveCommands);

// register message handler which handles live events:
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribe(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureEvents
);
});

// register message handler which handles incoming messages:
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureMessages,
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable), adaptableAsLiveMessage(adaptable)));
});

// register message handler which handles live commands:
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType, previousSubscriptionId) -> {
final String subscriptionMessage = buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);

return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureLiveCommands,
adaptable -> bus -> bus.getExecutor().submit(() -> handleLiveCommandOrResponse(adaptable))
);
});
return completableFutureCombined;
try {
// register message handler which handles live events:
subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribe(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureEvents
);
});

// register message handler which handles incoming messages:
subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);
return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureMessages,
adaptable -> bus -> bus.notify(getPointerBusKey(adaptable),
adaptableAsLiveMessage(adaptable)));
});

// register message handler which handles live commands:
subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND,
(streamingType, previousSubscriptionId) -> {
final String subscriptionMessage =
buildProtocolCommand(streamingType.start(), consumptionConfig);
messagingProvider.registerSubscriptionMessage(streamingType, subscriptionMessage);

return subscribeAndPublishMessage(previousSubscriptionId,
streamingType,
subscriptionMessage,
streamingType.startAck(),
completableFutureLiveCommands,
adaptable -> bus -> bus.getExecutor()
.submit(() -> handleLiveCommandOrResponse(adaptable))
);
});
return completableFutureCombined;
} catch (final ClientReconnectingException cre) {
return CompletableFuture.supplyAsync(() -> {
throw cre;
});
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ interface MessageSendable<T> {
* by its potential targets. </p>
*
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
*/
void send();

Expand All @@ -238,6 +240,8 @@ interface MessageSendable<T> {
* @param responseConsumer the Consumer which should be notified with the response ot the Throwable in case of
* an error.
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
*/
default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsumer) {
send(ByteBuffer.class, responseConsumer);
Expand All @@ -253,6 +257,8 @@ default void send(final BiConsumer<Message<ByteBuffer>, Throwable> responseConsu
* an error.
* @param <R> the type of the response message's payload.
* @throws IllegalStateException if the {@code Message} to be sent is in an invalid state.
* @throws org.eclipse.ditto.client.management.ClientReconnectingException if the client is in a reconnecting
* state.
* @since 1.0.0
*/
<R> void send(Class<R> responseType, BiConsumer<Message<R>, Throwable> responseConsumer);
Expand Down
Loading