Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
import glide.api.commands.StringBaseCommands;
import glide.api.commands.TransactionsBaseCommands;
import glide.api.models.GlideString;
import glide.api.models.PubsubMessage;
import glide.api.models.PubSubMessage;
import glide.api.models.Script;
import glide.api.models.commands.ExpireOptions;
import glide.api.models.commands.GetExOptions;
Expand Down Expand Up @@ -279,7 +279,7 @@ public abstract class BaseClient
// All made protected to simplify testing.
protected CommandManager commandManager;
protected ConnectionManager connectionManager;
protected ConcurrentLinkedDeque<PubsubMessage> messageQueue;
protected ConcurrentLinkedDeque<PubSubMessage> messageQueue;
protected Optional<BaseSubscriptionConfiguration> subscriptionConfiguration = Optional.empty();

/** Helper which extracts data from received {@link Response}s from GLIDE. */
Expand Down Expand Up @@ -353,7 +353,7 @@ protected static <T extends BaseClient> CompletableFuture<T> CreateClient(
* with a callback.
* @return A message if any or <code>null</code> if there are no unread messages.
*/
public PubsubMessage tryGetPubSubMessage() {
public PubSubMessage tryGetPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
Expand All @@ -375,7 +375,7 @@ public PubsubMessage tryGetPubSubMessage() {
* with a callback.
* @return A <code>Future</code> which resolved with the next incoming message.
*/
public CompletableFuture<PubsubMessage> getPubSubMessage() {
public CompletableFuture<PubSubMessage> getPubSubMessage() {
if (subscriptionConfiguration.isEmpty()) {
throw new ConfigurationError(
"The operation will never complete since there was no pubsub subscriptions applied to the"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
package glide.api.models;

import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.Getter;

/** PubSub message received by the client. */
@Getter
public class PubsubMessage {
@EqualsAndHashCode
public class PubSubMessage {
/** An incoming message received. */
private final String message;

Expand All @@ -16,13 +18,13 @@ public class PubsubMessage {
/** A pattern matched to the channel name. */
private final Optional<String> pattern;

public PubsubMessage(String message, String channel, String pattern) {
public PubSubMessage(String message, String channel, String pattern) {
this.message = message;
this.channel = channel;
this.pattern = Optional.ofNullable(pattern);
}

public PubsubMessage(String message, String channel) {
public PubSubMessage(String message, String channel) {
this.message = message;
this.channel = channel;
this.pattern = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package glide.api.models.configuration;

import glide.api.BaseClient;
import glide.api.models.PubsubMessage;
import glide.api.models.PubSubMessage;
import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode;
import java.util.HashSet;
Expand Down Expand Up @@ -33,11 +33,11 @@ public interface ChannelMode {}
* The callback arguments are:
*
* <ol>
* <li>A received {@link PubsubMessage}.
* <li>A received {@link PubSubMessage}.
* <li>A user-defined {@link #context} or <code>null</code> if not configured.
* </ol>
*/
public interface MessageCallback extends BiConsumer<PubsubMessage, Object> {}
public interface MessageCallback extends BiConsumer<PubSubMessage, Object> {}

/**
* Optional callback to accept the incoming messages. See {@link MessageCallback}.<br>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.connectors.handlers;

import glide.api.models.PubsubMessage;
import glide.api.models.PubSubMessage;
import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback;
import glide.api.models.exceptions.RedisException;
import glide.managers.BaseResponseResolver;
Expand Down Expand Up @@ -33,7 +33,7 @@ public class MessageHandler {
private final BaseResponseResolver responseResolver;

/** A message queue wrapper. */
private final ConcurrentLinkedDeque<PubsubMessage> queue = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedDeque<PubSubMessage> queue = new ConcurrentLinkedDeque<>();

/** Process a push (PUBSUB) message received as a part of {@link Response} from GLIDE. */
public void handle(Response response) {
Expand All @@ -57,11 +57,11 @@ public void handle(Response response) {
// "Transport disconnected, messages might be lost",
break;
case PMessage:
handle(new PubsubMessage((String) values[2], (String) values[1], (String) values[0]));
handle(new PubSubMessage((String) values[2], (String) values[1], (String) values[0]));
return;
case Message:
case SMessage:
handle(new PubsubMessage((String) values[1], (String) values[0]));
handle(new PubSubMessage((String) values[1], (String) values[0]));
return;
case Subscribe:
case PSubscribe:
Expand All @@ -88,8 +88,8 @@ public void handle(Response response) {
}
}

/** Process a {@link PubsubMessage} received. */
private void handle(PubsubMessage message) {
/** Process a {@link PubSubMessage} received. */
private void handle(PubSubMessage message) {
if (callback.isPresent()) {
callback.get().accept(message, context.orElse(null));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClient(
}

if (configuration.getSubscriptionConfiguration() != null) {
// TODO throw WrongConfiguration if RESP2
// TODO throw ConfigurationError if RESP2
var subscriptionsBuilder = PubSubSubscriptions.newBuilder();
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
Expand Down Expand Up @@ -173,7 +173,7 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderRedisClusterClien
connectionRequestBuilder.setClusterModeEnabled(true);

if (configuration.getSubscriptionConfiguration() != null) {
// TODO throw WrongConfiguration if RESP2
// TODO throw ConfigurationError if RESP2
var subscriptionsBuilder = PubSubSubscriptions.newBuilder();
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
Expand Down
Loading