-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Redis command listener #1972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Redis command listener #1972
Changes from all commits
3f76ce9
08cbb6c
ba0c164
2455fa8
3349c23
bf3e1fc
eedc007
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,3 +13,4 @@ tags | |
| .idea | ||
| *.aof | ||
| *.rdb | ||
| - | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| import java.net.Socket; | ||
| import java.net.SocketException; | ||
| import java.util.ArrayList; | ||
| import java.util.LinkedList; | ||
| import java.util.List; | ||
|
|
||
| import javax.net.ssl.HostnameVerifier; | ||
|
|
@@ -37,6 +38,7 @@ public class Connection implements Closeable { | |
| private SSLSocketFactory sslSocketFactory; | ||
| private SSLParameters sslParameters; | ||
| private HostnameVerifier hostnameVerifier; | ||
| private final List<JedisCommandListener> listeners = new LinkedList<>(); | ||
|
|
||
| public Connection() { | ||
| } | ||
|
|
@@ -87,6 +89,46 @@ public void setSoTimeout(int soTimeout) { | |
| this.soTimeout = soTimeout; | ||
| } | ||
|
|
||
| public void addListeners(List<JedisCommandListener> listeners) { | ||
| if (listeners != null) { | ||
| for (JedisCommandListener listener : listeners) { | ||
| addListener(listener); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void addListener(JedisCommandListener listener) { | ||
| this.listeners.add(listener); | ||
| } | ||
|
|
||
| public void removeListener(JedisCommandListener listener) { | ||
| this.listeners.remove(listener); | ||
| } | ||
|
|
||
| private void notifyCommandStarted(final ProtocolCommand cmd, final byte[]... args) { | ||
| for (JedisCommandListener listener : listeners) { | ||
| listener.commandStarted(this, cmd, args); | ||
| } | ||
| } | ||
|
|
||
| private void notifyCommandConnected(final ProtocolCommand cmd) { | ||
| for (JedisCommandListener listener : listeners) { | ||
| listener.commandConnected(this, cmd); | ||
| } | ||
| } | ||
|
|
||
| private void notifyCommandFinished(final ProtocolCommand cmd) { | ||
| for (JedisCommandListener listener : listeners) { | ||
| listener.commandFinished(this, cmd); | ||
| } | ||
| } | ||
|
|
||
| private void notifyCommandFailed(final ProtocolCommand cmd, Throwable t) { | ||
| for (JedisCommandListener listener : listeners) { | ||
| listener.commandFailed(this, cmd, t); | ||
| } | ||
| } | ||
|
|
||
| public void setTimeoutInfinite() { | ||
| try { | ||
| if (!isConnected()) { | ||
|
|
@@ -122,8 +164,11 @@ public void sendCommand(final ProtocolCommand cmd) { | |
|
|
||
| public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { | ||
| try { | ||
| notifyCommandStarted(cmd, args); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we have more "lightweight" solution that will not add this overhead in case no listeners were set. |
||
| connect(); | ||
| notifyCommandConnected(cmd); | ||
| Protocol.sendCommand(outputStream, cmd, args); | ||
| notifyCommandFinished(cmd); | ||
| } catch (JedisConnectionException ex) { | ||
| /* | ||
| * When client send request which formed by invalid protocol, Redis send back error message | ||
|
|
@@ -143,7 +188,11 @@ public void sendCommand(final ProtocolCommand cmd, final byte[]... args) { | |
| } | ||
| // Any other exceptions related to connection? | ||
| broken = true; | ||
| notifyCommandFailed(cmd, ex); | ||
| throw ex; | ||
| } catch (Throwable t) { | ||
| notifyCommandFailed(cmd, t); | ||
| throw t; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| package redis.clients.jedis; | ||
|
|
||
| import redis.clients.jedis.commands.ProtocolCommand; | ||
|
|
||
| /** | ||
| * A listener for command events. | ||
| * <p> | ||
| * All commands on the same connections are executed synchronously, however there the | ||
| * implementation must be thread safe because the listener might be invoked from | ||
| * different threads. | ||
| * <p> | ||
| * The listener is guaranteed to call {@link #commandStarted} and one of the | ||
| * {@link #commandFinished} or {@link #commandFailed} once for each command. | ||
| * <p> | ||
| * The listener implementation must be exception free or else none of the aforementioned | ||
| * contracts will be enforced. | ||
| */ | ||
| public interface JedisCommandListener { | ||
gr8routdoors marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /** | ||
| * Called for every command event that started execution | ||
| * | ||
| * @param connection the connection on which the command the executed | ||
| * @param event the executable command, see {@link Protocol.Command} | ||
| * @param args byte array of the command arguments | ||
| */ | ||
| void commandStarted(Connection connection, ProtocolCommand event, byte[]... args); | ||
|
|
||
| /** | ||
| * Called after connection has been established to Redis, but before the command has executed | ||
| * @param connection the connection on which the command the executed | ||
| * @param event the executable command, see {@link Protocol.Command} | ||
| */ | ||
| void commandConnected(Connection connection, ProtocolCommand event); | ||
|
|
||
| /** | ||
| * Called for every successfully executed command | ||
| * | ||
| * @param connection the connection on which the command the executed | ||
| * @param event the executable command, see {@link Protocol.Command} | ||
| */ | ||
| void commandFinished(Connection connection, ProtocolCommand event); | ||
|
|
||
| /** | ||
| * Called for every command that failed exceptionally | ||
| * | ||
| * @param connection the connection on which the command the executed | ||
| * @param event the executable command, see {@link Protocol.Command} | ||
| * @param t the triggered exception | ||
| */ | ||
| void commandFailed(Connection connection, ProtocolCommand event, Throwable t); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package redis.clients.jedis; | ||
|
|
||
| import java.net.URI; | ||
| import java.util.List; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| import javax.net.ssl.HostnameVerifier; | ||
|
|
@@ -29,17 +30,18 @@ class JedisFactory implements PooledObjectFactory<Jedis> { | |
| private final SSLSocketFactory sslSocketFactory; | ||
| private final SSLParameters sslParameters; | ||
| private final HostnameVerifier hostnameVerifier; | ||
| private final List<JedisCommandListener> listeners; | ||
|
|
||
| JedisFactory(final String host, final int port, final int connectionTimeout, | ||
| final int soTimeout, final String password, final int database, final String clientName) { | ||
| this(host, port, connectionTimeout, soTimeout, password, database, clientName, | ||
| false, null, null, null); | ||
| false, null, null, null, null); | ||
| } | ||
|
|
||
| JedisFactory(final String host, final int port, final int connectionTimeout, | ||
| final int soTimeout, final String password, final int database, final String clientName, | ||
| final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters, | ||
| final HostnameVerifier hostnameVerifier) { | ||
| final HostnameVerifier hostnameVerifier, final List<JedisCommandListener> listeners) { | ||
| this.hostAndPort.set(new HostAndPort(host, port)); | ||
| this.connectionTimeout = connectionTimeout; | ||
| this.soTimeout = soTimeout; | ||
|
|
@@ -50,16 +52,19 @@ class JedisFactory implements PooledObjectFactory<Jedis> { | |
| this.sslSocketFactory = sslSocketFactory; | ||
| this.sslParameters = sslParameters; | ||
| this.hostnameVerifier = hostnameVerifier; | ||
| this.listeners = listeners; | ||
| } | ||
|
|
||
| JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout, | ||
| final String clientName) { | ||
| this(uri, connectionTimeout, soTimeout, clientName, null, null, null); | ||
| this(uri, connectionTimeout, soTimeout, clientName, null, null, null, | ||
| null); | ||
| } | ||
|
|
||
| JedisFactory(final URI uri, final int connectionTimeout, final int soTimeout, | ||
| final String clientName, final SSLSocketFactory sslSocketFactory, | ||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This breaks backward compatibility. Create a new constructor. |
||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, | ||
| final List<JedisCommandListener> listeners) { | ||
| if (!JedisURIHelper.isValid(uri)) { | ||
| throw new InvalidURIException(String.format( | ||
| "Cannot open Redis connection due invalid URI. %s", uri.toString())); | ||
|
|
@@ -75,6 +80,7 @@ class JedisFactory implements PooledObjectFactory<Jedis> { | |
| this.sslSocketFactory = sslSocketFactory; | ||
| this.sslParameters = sslParameters; | ||
| this.hostnameVerifier = hostnameVerifier; | ||
| this.listeners = listeners; | ||
| } | ||
|
|
||
| public void setHostAndPort(final HostAndPort hostAndPort) { | ||
|
|
@@ -111,7 +117,7 @@ public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception { | |
| public PooledObject<Jedis> makeObject() throws Exception { | ||
| final HostAndPort hostAndPort = this.hostAndPort.get(); | ||
| final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, | ||
| soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier); | ||
| soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier, listeners); | ||
|
|
||
| try { | ||
| jedis.connect(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package redis.clients.jedis; | ||
|
|
||
| import java.net.URI; | ||
| import java.util.List; | ||
|
|
||
| import javax.net.ssl.HostnameVerifier; | ||
| import javax.net.ssl.SSLParameters; | ||
|
|
@@ -40,18 +41,26 @@ public JedisPool(final String host) { | |
|
|
||
| public JedisPool(final String host, final SSLSocketFactory sslSocketFactory, | ||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { | ||
| this(host, sslSocketFactory, sslParameters, hostnameVerifier, null); | ||
| } | ||
|
|
||
| public JedisPool(final String host, final SSLSocketFactory sslSocketFactory, | ||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier, | ||
| final List<JedisCommandListener> listeners) { | ||
| URI uri = URI.create(host); | ||
| if (JedisURIHelper.isValid(uri)) { | ||
| this.internalPool = new GenericObjectPool<Jedis>(new JedisFactory(uri, | ||
| Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, sslSocketFactory, sslParameters, | ||
| hostnameVerifier), new GenericObjectPoolConfig()); | ||
| Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, sslSocketFactory, sslParameters, | ||
| hostnameVerifier, listeners), new GenericObjectPoolConfig()); | ||
|
Comment on lines
+53
to
+54
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use two spaces per tab. |
||
| } else { | ||
| this.internalPool = new GenericObjectPool<Jedis>(new JedisFactory(host, | ||
| Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, | ||
| Protocol.DEFAULT_DATABASE, null, false, null, null, null), new GenericObjectPoolConfig()); | ||
| Protocol.DEFAULT_PORT, Protocol.DEFAULT_TIMEOUT, Protocol.DEFAULT_TIMEOUT, null, | ||
| Protocol.DEFAULT_DATABASE, null, false, null, null, null, | ||
| listeners), new GenericObjectPoolConfig()); | ||
|
Comment on lines
+57
to
+59
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use two spaces per tab. |
||
| } | ||
| } | ||
|
|
||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need. |
||
| public JedisPool(final URI uri) { | ||
| this(new GenericObjectPoolConfig(), uri); | ||
| } | ||
|
|
@@ -164,7 +173,7 @@ public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, in | |
| final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory, | ||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { | ||
| super(poolConfig, new JedisFactory(host, port, connectionTimeout, soTimeout, password, | ||
| database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier)); | ||
| database, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, null)); | ||
| } | ||
|
|
||
| public JedisPool(final GenericObjectPoolConfig poolConfig) { | ||
|
|
@@ -226,7 +235,7 @@ public JedisPool(final GenericObjectPoolConfig poolConfig, final URI uri, | |
| final int connectionTimeout, final int soTimeout, final SSLSocketFactory sslSocketFactory, | ||
| final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier) { | ||
| super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null, sslSocketFactory, | ||
| sslParameters, hostnameVerifier)); | ||
| sslParameters, hostnameVerifier, null)); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to think if we don't need to protect here from concurrency issues, especially when use in a pool