diff --git a/.rubocop.yml b/.rubocop.yml index 6654f1d..cbf8b09 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -185,3 +185,6 @@ Style/ModuleFunction: Style/CombinableLoops: Enabled: false + +Style/DocumentDynamicEvalDefinition: + Enabled: false diff --git a/Gemfile.lock b/Gemfile.lock index 7c0213c..a9f2c21 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: . specs: redis-client (0.0.0) + connection_pool GEM remote: https://rubygems.org/ diff --git a/README.md b/README.md index 7f93a0a..bd739b2 100644 --- a/README.md +++ b/README.md @@ -23,24 +23,43 @@ Or install it yourself as: ## Usage -To use `RedisClient` you first define a connection configuration, from which you can create clients: +To use `RedisClient` you first define a connection configuration, from which you can create a connection pool: + +```ruby +redis_config = RedisClient.config(host: "10.0.1.1", port: 6380, db: 15) +redis = redis_config.new_pool(timeout: 0.5, size: Integer(ENV.fetch("RAILS_MAX_THREADS", 5))) + +redis.call("PING") # => "PONG" +``` + +If you are issuing multiple commands in a raw, it's best to use `#with` to avoid going through the connection checkout +several times: + +```ruby +redis.with do |r| + r.call("SET", "mykey", "hello world") # => "OK" + r.call("GET", "mykey") # => "hello world" +end +``` + +If you are working in a single threaded environment, or wish to use your own connection pooling mechanism, +you can obtain a raw client with `#new_client` ```ruby redis_config = RedisClient.config(host: "10.0.1.1", port: 6380, db: 15) redis = redis_config.new_client -redis.call("SET", "mykey", "hello world") # => "OK" -redis.call("GET", "mykey") # => "hello world" +redis.call("PING") # => "PONG" ``` +NOTE: Raw `RedisClient` instances must not be shared between threads. Make sure to read the section on [thread safety](#thread-safety). + For simple use cases where only a single connection is needed, you can use the `RedisClient.new` shortcut: ```ruby -redis = RedisClient.new +redis = RedisClient.new(host: "10.0.1.1", port: 6380, db: 15) redis.call("GET", "mykey") ``` -NOTE: `RedisClient` instances must not be shared between threads. Make sure to read the section on [thread safety](#thread-safety). - ### Configuration - `url`: A Redis connection URL, e.g. `redis://example.com:6379/5`, a `rediss://` scheme enable SSL, and the path is interpreted as a database number. @@ -303,21 +322,18 @@ redis.call_once("INCR", "counter") # Won't be retried. ### Thread Safety Contrary to the `redis` gem, `redis-client` doesn't protect against concurrent access. -To use `redis-client` in concurrent environments, you MUST use a connection pool like [the `connection_pool` gem](https://rubygems.org/gems/connection_pool), or +To use `redis-client` in concurrent environments, you MUST use a connection pool, or have one client per Thread or Fiber. -```ruby -redis_config = RedisClient.config(host: "redis.example.com") -pool = ConnectionPool.new { redis_config.new_client } -pool.with do |redis| - redis.call("PING") -end -``` - ### Fork Safety `redis-client` doesn't try to detect forked processes. You MUST disconnect all clients before forking your process. +```ruby +redis.close +Process.fork ... +``` + ## Development After checking out the repo, run `bin/setup` to install dependencies. You can also run `bin/console` for an interactive prompt that will allow you to experiment. diff --git a/lib/redis_client.rb b/lib/redis_client.rb index 1e81fcc..32a10d9 100644 --- a/lib/redis_client.rb +++ b/lib/redis_client.rb @@ -9,11 +9,14 @@ class RedisClient Error = Class.new(StandardError) ConnectionError = Class.new(Error) + + FailoverError = Class.new(ConnectionError) + TimeoutError = Class.new(ConnectionError) ReadTimeoutError = Class.new(TimeoutError) WriteTimeoutError = Class.new(TimeoutError) ConnectTimeoutError = Class.new(TimeoutError) - FailoverError = Class.new(ConnectionError) + CheckoutTimeoutError = Class.new(ConnectTimeoutError) class CommandError < Error class << self @@ -70,6 +73,11 @@ def initialize( @disable_reconnection = false end + def with(_options = nil) + yield self + end + alias_method :then, :with + def timeout=(timeout) @connect_timeout = @read_timeout = @write_timeout = timeout end @@ -423,3 +431,4 @@ def raw_connection end require "redis_client/resp3" +require "redis_client/pooled" diff --git a/lib/redis_client/config.rb b/lib/redis_client/config.rb index 98bd72f..e0ffecb 100644 --- a/lib/redis_client/config.rb +++ b/lib/redis_client/config.rb @@ -63,6 +63,11 @@ def sentinel? false end + def new_pool(**kwargs) + kwargs[:timeout] ||= DEFAULT_TIMEOUT + Pooled.new(self, **kwargs) + end + def new_client(**kwargs) RedisClient.new(self, **kwargs) end diff --git a/lib/redis_client/pooled.rb b/lib/redis_client/pooled.rb new file mode 100644 index 0000000..0a9ed0c --- /dev/null +++ b/lib/redis_client/pooled.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "connection_pool" + +class RedisClient + class Pooled + EMPTY_HASH = {}.freeze + + def initialize(config, **kwargs) + @config = config + @pool_kwargs = kwargs + @pool = new_pool + @mutex = Mutex.new + end + + def with(options = EMPTY_HASH, &block) + pool.with(options, &block) + rescue ConnectionPool::TimeoutError => error + raise CheckoutTimeoutError, "Couldn't checkout a connection in time: #{error.message}" + end + alias_method :then, :with + + def close + if @pool + @mutex.synchronize do + pool = @pool + @pool = nil + pool&.shutdown(&:close) + end + end + nil + end + + %w(pipelined).each do |method| + class_eval <<~RUBY, __FILE__, __LINE__ + 1 + def #{method}(&block) + with { |r| r.#{method}(&block) } + end + RUBY + end + + %w(multi).each do |method| + class_eval <<~RUBY, __FILE__, __LINE__ + 1 + def #{method}(**kwargs, &block) + with { |r| r.#{method}(**kwargs, &block) } + end + RUBY + end + + %w(call call_once blocking_call pubsub).each do |method| + class_eval <<~RUBY, __FILE__, __LINE__ + 1 + def #{method}(*args) + with { |r| r.#{method}(*args) } + end + RUBY + end + + %w(scan sscan hscan zscan).each do |method| + class_eval <<~RUBY, __FILE__, __LINE__ + 1 + def #{method}(*args, &block) + unless block_given? + return to_enum(__callee__, *args) + end + + with { |r| r.#{method}(*args, &block) } + end + RUBY + end + + private + + def pool + @pool ||= @mutex.synchronize { new_pool } + end + + def new_pool + ConnectionPool.new(**@pool_kwargs) { @config.new_client } + end + end +end diff --git a/redis-client.gemspec b/redis-client.gemspec index fde68e5..0a006f6 100644 --- a/redis-client.gemspec +++ b/redis-client.gemspec @@ -29,4 +29,6 @@ Gem::Specification.new do |spec| spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] spec.extensions = ["ext/redis_client/hiredis/extconf.rb"] + + spec.add_runtime_dependency "connection_pool" end diff --git a/test/redis_client/sentinel_test.rb b/test/redis_client/sentinel_test.rb index 9324f7f..d7ba5d5 100644 --- a/test/redis_client/sentinel_test.rb +++ b/test/redis_client/sentinel_test.rb @@ -12,11 +12,7 @@ def setup end def teardown - unless healthy_master? - puts - puts "Restarting services" - Servers::ALL.reset - end + Servers::ALL.reset end def test_new_client @@ -166,17 +162,5 @@ def new_config(**kwargs) **kwargs, ) end - - def healthy_master? - Servers::SENTINELS.each do |sentinel| - sentinel_client = RedisClient.new(host: sentinel.host, port: sentinel.port) - return false unless sentinel_client.call("SENTINEL", "get-master-addr-by-name", "cache") == [Servers::REDIS.host, Servers::REDIS.port.to_s] - end - - client = RedisClient.new(host: Servers::REDIS.host, port: Servers::REDIS.port) - return false unless client.call("ROLE")&.first == "master" - - true - end end end diff --git a/test/redis_client_test.rb b/test/redis_client_test.rb index a33cefd..5c59fe5 100644 --- a/test/redis_client_test.rb +++ b/test/redis_client_test.rb @@ -2,9 +2,7 @@ require "test_helper" -class RedisClientTest < Minitest::Test - include ClientTestHelper - +module RedisClientTests def test_has_version assert_instance_of String, RedisClient::VERSION end @@ -129,7 +127,6 @@ def test_empty_pipeline end def test_large_read_pipelines - @redis.timeout = 5 @redis.call("LPUSH", "list", *1000.times.to_a) @redis.pipelined do |pipeline| 100.times do @@ -139,7 +136,6 @@ def test_large_read_pipelines end def test_large_write_pipelines - @redis.timeout = 5 @redis.pipelined do |pipeline| 10.times do |i| pipeline.call("SET", i, i.to_s * 10485760) @@ -275,24 +271,6 @@ def test_empty_transaction_watch_reset assert_equal "3", @redis.call("GET", "foo") end - def test_preselect_database - client = new_client(db: 5) - assert_includes client.call("CLIENT", "INFO"), " db=5 " - client.call("SELECT", 6) - assert_includes client.call("CLIENT", "INFO"), " db=6 " - client.close - assert_includes client.call("CLIENT", "INFO"), " db=5 " - end - - def test_set_client_id - client = new_client(id: "peter") - assert_includes client.call("CLIENT", "INFO"), " name=peter " - client.call("CLIENT", "SETNAME", "steven") - assert_includes client.call("CLIENT", "INFO"), " name=steven " - client.close - assert_includes client.call("CLIENT", "INFO"), " name=peter " - end - def test_call_timeout_false thr = Thread.start do client = new_client @@ -388,4 +366,48 @@ def test_hscan expected_pairs = Hash[*100.times.map(&:to_s)].to_a assert_equal expected_pairs, pairs end + + def test_preselect_database + client = new_client(db: 5) + assert_includes client.call("CLIENT", "INFO"), " db=5 " + client.call("SELECT", 6) + assert_includes client.call("CLIENT", "INFO"), " db=6 " + client.close + assert_includes client.call("CLIENT", "INFO"), " db=5 " + end + + def test_set_client_id + client = new_client(id: "peter") + assert_includes client.call("CLIENT", "INFO"), " name=peter " + client.call("CLIENT", "SETNAME", "steven") + assert_includes client.call("CLIENT", "INFO"), " name=steven " + client.close + assert_includes client.call("CLIENT", "INFO"), " name=peter " + end +end + +class RedisClientTest < Minitest::Test + include ClientTestHelper + include RedisClientTests +end + +class RedisPooledClientTest < Minitest::Test + include ClientTestHelper + include RedisClientTests + + def test_checkout_timeout + pool = RedisClient.config(**tcp_config).new_pool(size: 1, timeout: 0.01) + Thread.new { pool.instance_variable_get(:@pool).checkout }.join + + error = assert_raises RedisClient::ConnectionError do + pool.with {} + end + assert_includes error.message, "Couldn't checkout a connection in time: Waited 0.01 sec" + end + + private + + def new_client(**overrides) + RedisClient.config(**tcp_config.merge(overrides)).new_pool + end end diff --git a/test/support/server_manager.rb b/test/support/server_manager.rb index 3c85afc..ae2e178 100644 --- a/test/support/server_manager.rb +++ b/test/support/server_manager.rb @@ -3,9 +3,22 @@ require "pathname" class ServerManager + module NullIO + extend self + + def puts(_str) + nil + end + + def print(_str) + nil + end + end + ROOT = Pathname.new(File.expand_path("../../", __dir__)) attr_reader :name, :host, :port, :real_port, :command + attr_accessor :out def initialize(name, port:, command: nil, real_port: port, host: "127.0.0.1") @name = name @@ -13,26 +26,27 @@ def initialize(name, port:, command: nil, real_port: port, host: "127.0.0.1") @port = port @real_port = real_port @command = command + @out = $stderr end def spawn if alive? - $stderr.puts "#{name} already running with pid=#{pid}" + @out.puts "#{name} already running with pid=#{pid}" else pid_file.parent.mkpath - $stderr.print "starting #{name}... " + @out.print "starting #{name}... " pid = Process.spawn(*command, out: log_file.to_s, err: log_file.to_s) pid_file.write(pid.to_s) - $stderr.puts "started with pid=#{pid}" + @out.puts "started with pid=#{pid}" end end def wait(timeout: 5) - $stderr.print "Waiting for #{name} (port #{real_port})..." + @out.print "Waiting for #{name} (port #{real_port})..." if wait_until_ready(timeout: timeout) - $stderr.puts " ready." + @out.puts " ready." else - $stderr.puts " timedout." + @out.puts " timedout." end end @@ -102,12 +116,22 @@ def initialize(*servers) @servers = servers end + def silence + @servers.each { |s| s.out = ServerManager::NullIO } + yield + ensure + @servers.each { |s| s.out = $stderr } + end + def prepare shutdown @servers.each(&:spawn) @servers.each(&:wait) end - alias_method :reset, :prepare + + def reset + silence { prepare } + end def shutdown @servers.reverse_each(&:shutdown)