diff --git a/.gitignore b/.gitignore index d67d445a59..cbdd274c1d 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ tags .idea *.aof *.rdb +- diff --git a/src/main/java/redis/clients/jedis/BinaryJedis.java b/src/main/java/redis/clients/jedis/BinaryJedis.java index 17358d911c..873c676fd0 100644 --- a/src/main/java/redis/clients/jedis/BinaryJedis.java +++ b/src/main/java/redis/clients/jedis/BinaryJedis.java @@ -112,6 +112,16 @@ public BinaryJedis(final String host, final int port, final int connectionTimeou client.setSoTimeout(soTimeout); } + public BinaryJedis(final String host, final int port, final int connectionTimeout, + final int soTimeout, final boolean ssl, final SSLSocketFactory sslSocketFactory, + final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, + final List listeners) { + client = new Client(host, port, ssl, sslSocketFactory, sslParameters, hostnameVerifier); + client.setConnectionTimeout(connectionTimeout); + client.setSoTimeout(soTimeout); + client.addListeners(listeners); + } + public BinaryJedis(final JedisShardInfo shardInfo) { client = new Client(shardInfo.getHost(), shardInfo.getPort(), shardInfo.getSsl(), shardInfo.getSslSocketFactory(), shardInfo.getSslParameters(), diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 619c16e5e4..7e949490b8 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -6,6 +6,7 @@ import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import javax.net.ssl.HostnameVerifier; @@ -37,6 +38,7 @@ public class Connection implements Closeable { private SSLSocketFactory sslSocketFactory; private SSLParameters sslParameters; private HostnameVerifier hostnameVerifier; + private final List listeners = new LinkedList<>(); public Connection() { } @@ -87,6 +89,46 @@ public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } + public void addListeners(List listeners) { + if (listeners != null) { + for (JedisCommandListener listener : listeners) { + addListener(listener); + } + } + } + + public void addListener(JedisCommandListener listener) { + this.listeners.add(listener); + } + + public void removeListener(JedisCommandListener listener) { + this.listeners.remove(listener); + } + + private void notifyCommandStarted(final ProtocolCommand cmd, final byte[]... args) { + for (JedisCommandListener listener : listeners) { + listener.commandStarted(this, cmd, args); + } + } + + private void notifyCommandConnected(final ProtocolCommand cmd) { + for (JedisCommandListener listener : listeners) { + listener.commandConnected(this, cmd); + } + } + + private void notifyCommandFinished(final ProtocolCommand cmd) { + for (JedisCommandListener listener : listeners) { + listener.commandFinished(this, cmd); + } + } + + private void notifyCommandFailed(final ProtocolCommand cmd, Throwable t) { + for (JedisCommandListener listener : listeners) { + listener.commandFailed(this, cmd, t); + } + } + public void setTimeoutInfinite() { try { if (!isConnected()) { @@ -122,8 +164,11 @@ public void sendCommand(final ProtocolCommand cmd) { public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { try { + notifyCommandStarted(cmd, args); connect(); + notifyCommandConnected(cmd); Protocol.sendCommand(outputStream, cmd, args); + notifyCommandFinished(cmd); } catch (JedisConnectionException ex) { /* * When client send request which formed by invalid protocol, Redis send back error message @@ -143,7 +188,11 @@ public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { } // Any other exceptions related to connection? broken = true; + notifyCommandFailed(cmd, ex); throw ex; + } catch (Throwable t) { + notifyCommandFailed(cmd, t); + throw t; } } diff --git a/src/main/java/redis/clients/jedis/Jedis.java b/src/main/java/redis/clients/jedis/Jedis.java index 8cfa8edce0..dcb75e1606 100644 --- a/src/main/java/redis/clients/jedis/Jedis.java +++ b/src/main/java/redis/clients/jedis/Jedis.java @@ -91,6 +91,13 @@ public Jedis(final String host, final int port, final int connectionTimeout, fin hostnameVerifier); } + public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout, + final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, + final HostnameVerifier hostnameVerifier, final List listeners) { + super(host, port, connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, + hostnameVerifier, listeners); + } + public Jedis(JedisShardInfo shardInfo) { super(shardInfo); } diff --git a/src/main/java/redis/clients/jedis/JedisCommandListener.java b/src/main/java/redis/clients/jedis/JedisCommandListener.java new file mode 100644 index 0000000000..e93409e0ad --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisCommandListener.java @@ -0,0 +1,51 @@ +package redis.clients.jedis; + +import redis.clients.jedis.commands.ProtocolCommand; + +/** + * A listener for command events. + *

+ * All commands on the same connections are executed synchronously, however there the + * implementation must be thread safe because the listener might be invoked from + * different threads. + *

+ * The listener is guaranteed to call {@link #commandStarted} and one of the + * {@link #commandFinished} or {@link #commandFailed} once for each command. + *

+ * The listener implementation must be exception free or else none of the aforementioned + * contracts will be enforced. + */ +public interface JedisCommandListener { + /** + * Called for every command event that started execution + * + * @param connection the connection on which the command the executed + * @param event the executable command, see {@link Protocol.Command} + * @param args byte array of the command arguments + */ + void commandStarted(Connection connection, ProtocolCommand event, byte[]... args); + + /** + * Called after connection has been established to Redis, but before the command has executed + * @param connection the connection on which the command the executed + * @param event the executable command, see {@link Protocol.Command} + */ + void commandConnected(Connection connection, ProtocolCommand event); + + /** + * Called for every successfully executed command + * + * @param connection the connection on which the command the executed + * @param event the executable command, see {@link Protocol.Command} + */ + void commandFinished(Connection connection, ProtocolCommand event); + + /** + * Called for every command that failed exceptionally + * + * @param connection the connection on which the command the executed + * @param event the executable command, see {@link Protocol.Command} + * @param t the triggered exception + */ + void commandFailed(Connection connection, ProtocolCommand event, Throwable t); +} diff --git a/src/main/java/redis/clients/jedis/JedisFactory.java b/src/main/java/redis/clients/jedis/JedisFactory.java index 12a32540c5..5fa3e9d70a 100644 --- a/src/main/java/redis/clients/jedis/JedisFactory.java +++ b/src/main/java/redis/clients/jedis/JedisFactory.java @@ -1,6 +1,7 @@ package redis.clients.jedis; import java.net.URI; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.HostnameVerifier; @@ -29,17 +30,18 @@ class JedisFactory implements PooledObjectFactory { private final SSLSocketFactory sslSocketFactory; private final SSLParameters sslParameters; private final HostnameVerifier hostnameVerifier; + private final List listeners; JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName) { this(host, port, connectionTimeout, soTimeout, password, database, clientName, - false, null, null, null); + false, null, null, null, null); } JedisFactory(final String host, final int port, final int connectionTimeout, final int soTimeout, final String password, final int database, final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, - final HostnameVerifier hostnameVerifier) { + final HostnameVerifier hostnameVerifier, final List listeners) { this.hostAndPort.set(new HostAndPort(host, port)); this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; @@ -50,16 +52,19 @@ class JedisFactory implements PooledObjectFactory { this.sslSocketFactory = sslSocketFactory; this.sslParameters = sslParameters; this.hostnameVerifier = hostnameVerifier; + this.listeners = listeners; } JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout, final String clientName) { - this(uri, connectionTimeout, soTimeout, clientName, null, null, null); + this(uri, connectionTimeout, soTimeout, clientName, null, null, null, + null); } JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout, final String clientName, final SSLSocketFactory sslSocketFactory, - final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { + final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, + final List listeners) { if (!JedisURIHelper.isValid(uri)) { throw new InvalidURIException(String.format( "Cannot open Redis connection due invalid URI. %s", uri.toString())); @@ -75,6 +80,7 @@ class JedisFactory implements PooledObjectFactory { this.sslSocketFactory = sslSocketFactory; this.sslParameters = sslParameters; this.hostnameVerifier = hostnameVerifier; + this.listeners = listeners; } public void setHostAndPort(final HostAndPort hostAndPort) { @@ -111,7 +117,7 @@ public void destroyObject(PooledObject pooledJedis) throws Exception { public PooledObject makeObject() throws Exception { final HostAndPort hostAndPort = this.hostAndPort.get(); final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, - soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); + soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier, listeners); try { jedis.connect(); diff --git a/src/main/java/redis/clients/jedis/JedisPool.java b/src/main/java/redis/clients/jedis/JedisPool.java index 5e93de84e5..493d1ea67a 100644 --- a/src/main/java/redis/clients/jedis/JedisPool.java +++ b/src/main/java/redis/clients/jedis/JedisPool.java @@ -1,6 +1,7 @@ package redis.clients.jedis; import java.net.URI; +import java.util.List; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLParameters; @@ -40,18 +41,26 @@ public JedisPool(final String host) { public JedisPool(final String host, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { + this(host, sslSocketFactory, sslParameters, hostnameVerifier, null); + } + + public JedisPool(final String host, final SSLSocketFactory sslSocketFactory, + final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, + final List listeners) { URI uri = URI.create(host); if (JedisURIHelper.isValid(uri)) { this.internalPool = new GenericObjectPool(new JedisFactory(uri, - Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, sslSocketFactory, sslParameters, - hostnameVerifier), new GenericObjectPoolConfig()); + Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, sslSocketFactory, sslParameters, + hostnameVerifier, listeners), new GenericObjectPoolConfig()); } else { this.internalPool = new GenericObjectPool(new JedisFactory(host, - Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, - Protocol.DEFAULT_DATABASE, null, false, null, null, null), new GenericObjectPoolConfig()); + Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE, null, false, null, null, null, + listeners), new GenericObjectPoolConfig()); } } + public JedisPool(final URI uri) { this(new GenericObjectPoolConfig(), uri); } @@ -164,7 +173,7 @@ public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, in final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { super(poolConfig, new JedisFactory(host, port, connectionTimeout, soTimeout, password, - database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); + database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, null)); } public JedisPool(final GenericObjectPoolConfig poolConfig) { @@ -226,7 +235,7 @@ public JedisPool(final GenericObjectPoolConfig poolConfig, final URI uri, final int connectionTimeout, final int soTimeout, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null, sslSocketFactory, - sslParameters, hostnameVerifier)); + sslParameters, hostnameVerifier, null)); } @Override diff --git a/src/test/java/redis/clients/jedis/tests/ConnectionTest.java b/src/test/java/redis/clients/jedis/tests/ConnectionTest.java index e6ebe058e1..46c49c5a96 100644 --- a/src/test/java/redis/clients/jedis/tests/ConnectionTest.java +++ b/src/test/java/redis/clients/jedis/tests/ConnectionTest.java @@ -1,17 +1,23 @@ package redis.clients.jedis.tests; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.LinkedList; +import java.util.List; + import redis.clients.jedis.Connection; +import redis.clients.jedis.JedisCommandListener; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.commands.ProtocolCommand; import redis.clients.jedis.exceptions.JedisConnectionException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class ConnectionTest { private Connection client; @@ -101,4 +107,186 @@ private Object read() { assertEquals("Attempting to read from a broken connection", jce.getMessage()); } } + + @Test + public void notifiesListenerOnSuccessfulCommand() { + TrackingCommandListener listener = new TrackingCommandListener(); + this.client.addListener(listener); + this.client.setHost("localhost"); + this.client.setPort(6379); + + this.client.sendCommand(Command.PING); + + assertTrue(listener.isCommandStarted()); + assertTrue(listener.isCommandConnected()); + assertTrue(listener.isCommandFinished()); + assertFalse(listener.isCommandFailed()); + } + + @Test + public void notifiesListenerOnConnectionError() { + TrackingCommandListener listener = new TrackingCommandListener(); + this.client.addListener(listener); + this.client.setHost("someunknownhost"); + + try { + this.client.sendCommand(Command.PING); + fail("Test did not throw exception as expected"); + } catch (JedisConnectionException jce) { + } + + assertTrue(listener.isCommandStarted()); + assertTrue(listener.isCommandFailed()); + assertFalse(listener.isCommandConnected()); + assertFalse(listener.isCommandFinished()); + } + + @Test + public void notifiesListenerOnUnexpectedError() { + class UnexpectedBrokenConnection extends Connection { + @Override + public void connect() { + throw new IllegalStateException("unexpected exception"); + } + } + + TrackingCommandListener listener = new TrackingCommandListener(); + this.client = new UnexpectedBrokenConnection(); + this.client.addListener(listener); + + try { + this.client.sendCommand(Command.PING); + fail("Test did not throw exception as expected"); + } catch (IllegalStateException ise) { + } + + assertTrue(listener.isCommandStarted()); + assertTrue(listener.isCommandFailed()); + assertFalse(listener.isCommandConnected()); + assertFalse(listener.isCommandFinished()); + } + + @Test + public void supportsMultipleListeners() { + TrackingCommandListener listener1 = new TrackingCommandListener(); + TrackingCommandListener listener2 = new TrackingCommandListener(); + List listeners = new LinkedList<>(); + listeners.add(listener1); + listeners.add(listener2); + + this.client.addListeners(listeners); + this.client.setHost("localhost"); + this.client.setPort(6379); + + this.client.sendCommand(Command.PING); + + assertTrue(listener1.isCommandStarted()); + assertTrue(listener1.isCommandConnected()); + assertTrue(listener1.isCommandFinished()); + assertFalse(listener1.isCommandFailed()); + assertTrue(listener2.isCommandStarted()); + assertTrue(listener2.isCommandConnected()); + assertTrue(listener2.isCommandFinished()); + assertFalse(listener2.isCommandFailed()); + } + + @Test + public void interpretsNullListenerListAsEmptyList() { + try { + this.client.addListeners(null); + this.client.setHost("localhost"); + this.client.setPort(6379); + this.client.sendCommand(Command.PING); + } catch (NullPointerException e) { + fail("A null listener list wasn't properly coerced to empty list."); + } + } + + @Test + public void addingListenersDoesntRemovePreexistingOnes() { + TrackingCommandListener listener1 = new TrackingCommandListener(); + TrackingCommandListener listener2 = new TrackingCommandListener(); + + this.client.addListener(listener1); + this.client.addListener(listener2); + this.client.setHost("localhost"); + this.client.setPort(6379); + + this.client.sendCommand(Command.PING); + + assertTrue(listener1.isCommandStarted()); + assertTrue(listener1.isCommandConnected()); + assertTrue(listener1.isCommandFinished()); + assertFalse(listener1.isCommandFailed()); + assertTrue(listener2.isCommandStarted()); + assertTrue(listener2.isCommandConnected()); + assertTrue(listener2.isCommandFinished()); + assertFalse(listener2.isCommandFailed()); + } + + @Test + public void batchAddingListenersDoesntRemovePreexistingOnes() { + TrackingCommandListener listener1 = new TrackingCommandListener(); + + TrackingCommandListener listener2 = new TrackingCommandListener(); + List listeners = new LinkedList<>(); + listeners.add(listener2); + + this.client.addListener(listener1); + this.client.addListeners(listeners); + this.client.setHost("localhost"); + this.client.setPort(6379); + + this.client.sendCommand(Command.PING); + + assertTrue(listener1.isCommandStarted()); + assertTrue(listener1.isCommandConnected()); + assertTrue(listener1.isCommandFinished()); + assertFalse(listener1.isCommandFailed()); + assertTrue(listener2.isCommandStarted()); + assertTrue(listener2.isCommandConnected()); + assertTrue(listener2.isCommandFinished()); + assertFalse(listener2.isCommandFailed()); + } + + class TrackingCommandListener implements JedisCommandListener { + + private boolean commandStarted = false; + private boolean commandConnected = false; + private boolean commandFinished = false; + private boolean commandFailed = false; + + @Override public void commandStarted(Connection connection, ProtocolCommand event, + byte[]... args) { + this.commandStarted = true; + } + + @Override public void commandConnected(Connection connection, ProtocolCommand event) { + this.commandConnected = true; + } + + @Override public void commandFinished(Connection connection, ProtocolCommand event) { + this.commandFinished = true; + } + + @Override public void commandFailed(Connection connection, ProtocolCommand event, Throwable t) { + this.commandFailed = true; + } + + public boolean isCommandStarted() { + return commandStarted; + } + + public boolean isCommandConnected() { + return commandConnected; + } + + public boolean isCommandFinished() { + return commandFinished; + } + + public boolean isCommandFailed() { + return commandFailed; + } + } }