Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,6 @@ Style/ModuleFunction:

Style/CombinableLoops:
Enabled: false

Style/DocumentDynamicEvalDefinition:
Enabled: false
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
redis-client (0.0.0)
connection_pool

GEM
remote: https://rubygems.org/
Expand Down
46 changes: 31 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,43 @@ Or install it yourself as:

## Usage

To use `RedisClient` you first define a connection configuration, from which you can create clients:
To use `RedisClient` you first define a connection configuration, from which you can create a connection pool:

```ruby
redis_config = RedisClient.config(host: "10.0.1.1", port: 6380, db: 15)
redis = redis_config.new_pool(timeout: 0.5, size: Integer(ENV.fetch("RAILS_MAX_THREADS", 5)))

redis.call("PING") # => "PONG"
```

If you are issuing multiple commands in a raw, it's best to use `#with` to avoid going through the connection checkout
several times:

```ruby
redis.with do |r|
r.call("SET", "mykey", "hello world") # => "OK"
r.call("GET", "mykey") # => "hello world"
end
```

If you are working in a single threaded environment, or wish to use your own connection pooling mechanism,
you can obtain a raw client with `#new_client`

```ruby
redis_config = RedisClient.config(host: "10.0.1.1", port: 6380, db: 15)
redis = redis_config.new_client
redis.call("SET", "mykey", "hello world") # => "OK"
redis.call("GET", "mykey") # => "hello world"
redis.call("PING") # => "PONG"
```

NOTE: Raw `RedisClient` instances must not be shared between threads. Make sure to read the section on [thread safety](#thread-safety).

For simple use cases where only a single connection is needed, you can use the `RedisClient.new` shortcut:

```ruby
redis = RedisClient.new
redis = RedisClient.new(host: "10.0.1.1", port: 6380, db: 15)
redis.call("GET", "mykey")
```

NOTE: `RedisClient` instances must not be shared between threads. Make sure to read the section on [thread safety](#thread-safety).

### Configuration

- `url`: A Redis connection URL, e.g. `redis://example.com:6379/5`, a `rediss://` scheme enable SSL, and the path is interpreted as a database number.
Expand Down Expand Up @@ -303,21 +322,18 @@ redis.call_once("INCR", "counter") # Won't be retried.
### Thread Safety

Contrary to the `redis` gem, `redis-client` doesn't protect against concurrent access.
To use `redis-client` in concurrent environments, you MUST use a connection pool like [the `connection_pool` gem](https://rubygems.org/gems/connection_pool), or
To use `redis-client` in concurrent environments, you MUST use a connection pool, or
have one client per Thread or Fiber.

```ruby
redis_config = RedisClient.config(host: "redis.example.com")
pool = ConnectionPool.new { redis_config.new_client }
pool.with do |redis|
redis.call("PING")
end
```

### Fork Safety

`redis-client` doesn't try to detect forked processes. You MUST disconnect all clients before forking your process.

```ruby
redis.close
Process.fork ...
```

## Development

After checking out the repo, run `bin/setup` to install dependencies. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
Expand Down
11 changes: 10 additions & 1 deletion lib/redis_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ class RedisClient
Error = Class.new(StandardError)

ConnectionError = Class.new(Error)

FailoverError = Class.new(ConnectionError)

TimeoutError = Class.new(ConnectionError)
ReadTimeoutError = Class.new(TimeoutError)
WriteTimeoutError = Class.new(TimeoutError)
ConnectTimeoutError = Class.new(TimeoutError)
FailoverError = Class.new(ConnectionError)
CheckoutTimeoutError = Class.new(ConnectTimeoutError)

class CommandError < Error
class << self
Expand Down Expand Up @@ -70,6 +73,11 @@ def initialize(
@disable_reconnection = false
end

def with(_options = nil)
yield self
end
alias_method :then, :with

def timeout=(timeout)
@connect_timeout = @read_timeout = @write_timeout = timeout
end
Expand Down Expand Up @@ -423,3 +431,4 @@ def raw_connection
end

require "redis_client/resp3"
require "redis_client/pooled"
5 changes: 5 additions & 0 deletions lib/redis_client/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def sentinel?
false
end

def new_pool(**kwargs)
kwargs[:timeout] ||= DEFAULT_TIMEOUT
Pooled.new(self, **kwargs)
end

def new_client(**kwargs)
RedisClient.new(self, **kwargs)
end
Expand Down
80 changes: 80 additions & 0 deletions lib/redis_client/pooled.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# frozen_string_literal: true

require "connection_pool"

class RedisClient
class Pooled
EMPTY_HASH = {}.freeze

def initialize(config, **kwargs)
@config = config
@pool_kwargs = kwargs
@pool = new_pool
@mutex = Mutex.new
end

def with(options = EMPTY_HASH, &block)
pool.with(options, &block)
rescue ConnectionPool::TimeoutError => error
raise CheckoutTimeoutError, "Couldn't checkout a connection in time: #{error.message}"
end
alias_method :then, :with

def close
if @pool
@mutex.synchronize do
pool = @pool
@pool = nil
pool&.shutdown(&:close)
end
end
nil
end

%w(pipelined).each do |method|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
def #{method}(&block)
with { |r| r.#{method}(&block) }
end
RUBY
end

%w(multi).each do |method|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
def #{method}(**kwargs, &block)
with { |r| r.#{method}(**kwargs, &block) }
end
RUBY
end

%w(call call_once blocking_call pubsub).each do |method|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
def #{method}(*args)
with { |r| r.#{method}(*args) }
end
RUBY
end

%w(scan sscan hscan zscan).each do |method|
class_eval <<~RUBY, __FILE__, __LINE__ + 1
def #{method}(*args, &block)
unless block_given?
return to_enum(__callee__, *args)
end

with { |r| r.#{method}(*args, &block) }
end
RUBY
end

private

def pool
@pool ||= @mutex.synchronize { new_pool }
end

def new_pool
ConnectionPool.new(**@pool_kwargs) { @config.new_client }
end
end
end
2 changes: 2 additions & 0 deletions redis-client.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]
spec.extensions = ["ext/redis_client/hiredis/extconf.rb"]

spec.add_runtime_dependency "connection_pool"
end
18 changes: 1 addition & 17 deletions test/redis_client/sentinel_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ def setup
end

def teardown
unless healthy_master?
puts
puts "Restarting services"
Servers::ALL.reset
end
Servers::ALL.reset
end

def test_new_client
Expand Down Expand Up @@ -166,17 +162,5 @@ def new_config(**kwargs)
**kwargs,
)
end

def healthy_master?
Servers::SENTINELS.each do |sentinel|
sentinel_client = RedisClient.new(host: sentinel.host, port: sentinel.port)
return false unless sentinel_client.call("SENTINEL", "get-master-addr-by-name", "cache") == [Servers::REDIS.host, Servers::REDIS.port.to_s]
end

client = RedisClient.new(host: Servers::REDIS.host, port: Servers::REDIS.port)
return false unless client.call("ROLE")&.first == "master"

true
end
end
end
68 changes: 45 additions & 23 deletions test/redis_client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

require "test_helper"

class RedisClientTest < Minitest::Test
include ClientTestHelper

module RedisClientTests
def test_has_version
assert_instance_of String, RedisClient::VERSION
end
Expand Down Expand Up @@ -129,7 +127,6 @@ def test_empty_pipeline
end

def test_large_read_pipelines
@redis.timeout = 5
@redis.call("LPUSH", "list", *1000.times.to_a)
@redis.pipelined do |pipeline|
100.times do
Expand All @@ -139,7 +136,6 @@ def test_large_read_pipelines
end

def test_large_write_pipelines
@redis.timeout = 5
@redis.pipelined do |pipeline|
10.times do |i|
pipeline.call("SET", i, i.to_s * 10485760)
Expand Down Expand Up @@ -275,24 +271,6 @@ def test_empty_transaction_watch_reset
assert_equal "3", @redis.call("GET", "foo")
end

def test_preselect_database
client = new_client(db: 5)
assert_includes client.call("CLIENT", "INFO"), " db=5 "
client.call("SELECT", 6)
assert_includes client.call("CLIENT", "INFO"), " db=6 "
client.close
assert_includes client.call("CLIENT", "INFO"), " db=5 "
end

def test_set_client_id
client = new_client(id: "peter")
assert_includes client.call("CLIENT", "INFO"), " name=peter "
client.call("CLIENT", "SETNAME", "steven")
assert_includes client.call("CLIENT", "INFO"), " name=steven "
client.close
assert_includes client.call("CLIENT", "INFO"), " name=peter "
end

def test_call_timeout_false
thr = Thread.start do
client = new_client
Expand Down Expand Up @@ -388,4 +366,48 @@ def test_hscan
expected_pairs = Hash[*100.times.map(&:to_s)].to_a
assert_equal expected_pairs, pairs
end

def test_preselect_database
client = new_client(db: 5)
assert_includes client.call("CLIENT", "INFO"), " db=5 "
client.call("SELECT", 6)
assert_includes client.call("CLIENT", "INFO"), " db=6 "
client.close
assert_includes client.call("CLIENT", "INFO"), " db=5 "
end

def test_set_client_id
client = new_client(id: "peter")
assert_includes client.call("CLIENT", "INFO"), " name=peter "
client.call("CLIENT", "SETNAME", "steven")
assert_includes client.call("CLIENT", "INFO"), " name=steven "
client.close
assert_includes client.call("CLIENT", "INFO"), " name=peter "
end
end

class RedisClientTest < Minitest::Test
include ClientTestHelper
include RedisClientTests
end

class RedisPooledClientTest < Minitest::Test
include ClientTestHelper
include RedisClientTests

def test_checkout_timeout
pool = RedisClient.config(**tcp_config).new_pool(size: 1, timeout: 0.01)
Thread.new { pool.instance_variable_get(:@pool).checkout }.join

error = assert_raises RedisClient::ConnectionError do
pool.with {}
end
assert_includes error.message, "Couldn't checkout a connection in time: Waited 0.01 sec"
end

private

def new_client(**overrides)
RedisClient.config(**tcp_config.merge(overrides)).new_pool
end
end
Loading