Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
34d2842
Streamline the `rate-limiter` implementation
marksto May 9, 2025
377e91e
Parametrize `rate-limiter` with an optional `sleep-fn`
marksto May 9, 2025
6b85ed0
Drop the unnecessary `allowed-rate-limiter-option-keys` var
marksto May 9, 2025
6201d1c
Update the specs and documentation
marksto May 9, 2025
e0399f4
Fix a typo in tests
marksto May 9, 2025
9e38dc4
Improve on `try-acquire` exception handling
marksto May 9, 2025
5e6d50c
Rename `acquire!`/`try-acquire` logic implementing fns
marksto May 9, 2025
919c8a8
Parametrize `sleep-fn` with state and permits
marksto May 11, 2025
ff389b3
Update the `core` ns docstring
marksto May 11, 2025
36f6ee5
Rethink the `sleep-fn` param of the `rate-limiter` (simplify it's con…
marksto May 31, 2025
ca17b60
Re-impl `uninterruptible-sleep` fn w/o `LockSupport` (only handle exc…
marksto May 31, 2025
fff7921
Add test cases for uninterruptible sleep
marksto Jun 8, 2025
11f43cb
Refactor rate limiter tests
marksto Jun 8, 2025
bc69c83
Showcase that scheduled tasks do not get terminated
marksto Jun 8, 2025
324e29b
Await for termination of all tasks (depending on rate and thread count)
marksto Jun 8, 2025
2c180ff
Impl two separate ways of stopping scheduled tasks
marksto Jun 8, 2025
626a38c
Await for termination of all tasks (once again for those that need it)
marksto Jun 8, 2025
7da1242
Pick up a better name for the `total-wait-time` fn
marksto Jun 8, 2025
fac8c4f
Add a test case for when a task logic throws an exception
marksto Jun 8, 2025
aeb62de
Explicitly show that all counting works as expected
marksto Jun 8, 2025
b33bc6e
Reduce all visual clutter and unneeded repetition
marksto Jun 8, 2025
bcec598
Satisfy the CI formatter
marksto Jun 8, 2025
1083699
Fix for older JDK version on CI
marksto Jun 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/diehard/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,13 @@ You can always check circuit breaker state with
(defmacro
^{:doc "Create a rate limiter with options.

* `:rate` execution permits per second.
* `:max-cached-tokens` the max size of permits we can cache when idle"}
* `:rate` execution permits per second (may be a floating point number, e.g.
0.5 <=> 1 req every 2 sec)
* `:max-cached-tokens` the max size of permit tokens that the bucket can cache
when it's idle
* `:sleep-fn` a unary fn of millis to sleep for, allowing for custom sleep
semantics; by default, sleeps interruptedly; pass `uninterruptible-sleep`
to sleep uninterruptedly"}
defratelimiter [name opts]
`(def ~name (rl/rate-limiter (u/verify-opt-map-keys-with-spec :rate-limiter/rate-limiter-new ~opts))))

Expand All @@ -482,10 +487,9 @@ to given rate. Use `defratelimiter` to define a ratelimiter and use it as option

By default it will wait forever until there is permits available. You can also specify a
`max-wait-ms` to wait for a given time. If there's no permits in this period, this block
will throw a Clojure `ex-info`, with `ex-data` as
will throw a Clojure exception with a `:throttled true` entry in `ex-data`, as follows:

```clojure

(try
(with-rate-limiter {:ratelimiter myfl
:max-wait-ms 1000}
Expand Down
120 changes: 68 additions & 52 deletions src/diehard/rate_limiter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,26 @@
"Try to acquire given number of permits, allows blocking for at most `wait-ms` milliseconds.
Return true if there are enough permits in permitted time."))

(declare refill acquire-sleep-ms try-acquire-sleep-ms)

(defn- do-acquire [rate-limiter permits]
(refill rate-limiter)
(acquire-sleep-ms rate-limiter permits))

(defn- do-try-acquire [rate-limiter permits max-wait-ms]
(refill rate-limiter)
(try-acquire-sleep-ms rate-limiter permits max-wait-ms))
(declare refill do-acquire! do-try-acquire)

(defrecord TokenBucketRateLimiter [rate max-tokens
;; internal state
state]
state sleep-fn]
IRateLimiter
(acquire! [this]
(acquire! this 1))
(acquire! [this permits]
(let [sleep (do-acquire this permits)]
(when (> sleep 0)
(Thread/sleep ^long sleep))))
(refill this)
(do-acquire! this permits))
(try-acquire [this]
(try-acquire this 1))
(try-acquire [this permits]
(try-acquire this permits 0))
(try-acquire [this permits wait-ms]
(let [sleep (do-try-acquire this permits wait-ms)]
(if (false? sleep)
false
(do
(when (> sleep 0)
(Thread/sleep ^long sleep))
true)))))
(refill this)
(do-try-acquire this permits wait-ms)))

(defn- refill [^TokenBucketRateLimiter rate-limiter]
;; refill
(let [now (System/currentTimeMillis)]
(swap! (.-state rate-limiter)
(fn [state]
Expand All @@ -59,46 +44,77 @@
%))
(assoc :last-refill-ts now))))))

(defn- acquire-sleep-ms [^TokenBucketRateLimiter rate-limiter permits]
(let [{pending-tokens :reserved-tokens} (swap! (.-state rate-limiter)
update :reserved-tokens + permits)]
(if (<= pending-tokens 0)
0
;; time as milliseconds
(long (/ pending-tokens (.-rate rate-limiter))))))
(defn- ->sleep-ms ^long [pending-tokens rate]
(if (<= pending-tokens 0) 0 (long (/ pending-tokens rate))))

(defn- do-acquire!
[^TokenBucketRateLimiter rate-limiter permits]
(let [state (swap! (.-state rate-limiter) update :reserved-tokens + permits)
sleep-ms (->sleep-ms (:reserved-tokens state) (.-rate rate-limiter))]
((.-sleep-fn rate-limiter) sleep-ms)))

(defn- try-acquire-sleep-ms [^TokenBucketRateLimiter rate-limiter permits max-wait-ms]
(defn- do-try-acquire
[^TokenBucketRateLimiter rate-limiter permits max-wait-ms]
(try
(let [{pending-tokens :reserved-tokens}
(swap! (.-state rate-limiter)
(fn [state]
(update state :reserved-tokens
(fn [pending-tokens]
;; test if we can pass in wait period
(if (<= (- (+ pending-tokens permits)
(* max-wait-ms (.-rate rate-limiter)))
0)
(+ pending-tokens permits)
(throw (ex-info "Not enough permits." {:rate-limiter true})))))))]
(if (<= pending-tokens 0)
0
(long (/ pending-tokens (.-rate rate-limiter)))))
(catch clojure.lang.ExceptionInfo _
false)))
(let [state (swap! (.-state rate-limiter)
(fn [state]
(update state :reserved-tokens
(fn [pending-tokens]
;; test if we can pass in wait period
(if (<= (- (+ pending-tokens permits)
(* max-wait-ms (.-rate rate-limiter)))
0)
(+ pending-tokens permits)
(throw (ex-info "Not enough permits"
{:rate-limiter true})))))))
sleep-ms (->sleep-ms (:reserved-tokens state) (.-rate rate-limiter))]
((.-sleep-fn rate-limiter) sleep-ms)
true)
(catch Exception e
(if-not (:rate-limiter (ex-data e))
(throw e)
false))))

(defn interruptible-sleep [^long ms]
(when (pos? ms)
(Thread/sleep ms)))

(def ^:private nanos-in-ms 1000000)

(def ^{:const true :no-doc true}
allowed-rate-limiter-option-keys
#{:rate :max-cached-tokens})
(defn uninterruptible-sleep [^long ms]
(when (pos? ms)
(let [end-time-ns (+ (System/nanoTime) (* nanos-in-ms ms))]
(with-local-vars [interrupted? false]
(try (loop []
(let [remaining-ns (- end-time-ns (System/nanoTime))]
;; required sleep duration fully consumed — exit
(when (< 0 remaining-ns)
(try
(Thread/sleep (quot remaining-ns nanos-in-ms)
(mod remaining-ns nanos-in-ms))
;; successful sleep — exit
(catch InterruptedException _
(var-set interrupted? true)))
;; can only recur from tail position
(when @interrupted? (recur)))))
(finally
(when @interrupted?
(Thread/.interrupt (Thread/currentThread)))))))))

(defn rate-limiter
"Create a default rate limiter with:
* `rate`: permits per second (may be a floating point, e.g. 0.5 <=> 1 req every 2 sec)
* `max-cached-tokens`: the max size of tokens that the bucket can cache when it's idle"
[{:keys [rate max-cached-tokens] :as _opts}]
* `max-cached-tokens`: the max size of tokens that the bucket can cache when it's idle
* `sleep-fn`: a unary fn of millis to sleep for, allowing for custom sleep semantics;
by default, sleeps interruptedly; pass `uninterruptible-sleep` to sleep
uninterruptedly"
[{:keys [rate max-cached-tokens sleep-fn] :as _opts}]
(if (some? rate)
(let [max-cached-tokens (or max-cached-tokens (int rate))]
(let [max-cached-tokens (or max-cached-tokens (int rate))
sleep-fn (or sleep-fn interruptible-sleep)]
(TokenBucketRateLimiter. (/ (double rate) 1000)
max-cached-tokens
(atom {:reserved-tokens (double 0)
:last-refill-ts (long -1)})))
:last-refill-ts (long -1)})
sleep-fn))
(throw (IllegalArgumentException. ":rate is required for rate-limiter"))))
7 changes: 4 additions & 3 deletions src/diehard/spec.clj
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,15 @@
:timeout/on-failure]))

;; rate limiter
; allow floating point numbers, so that we can pass numbers such as 0.5, to signify 1 req / 2 seconds
;(s/def :rate-limiter/rate int?)

(s/def :rate-limiter/rate number?)
(s/def :rate-limiter/max-cached-tokens int?)
(s/def :rate-limiter/sleep-fn fn?)

(s/def :rate-limiter/rate-limiter-new
(only-keys :req-un [:rate-limiter/rate]
:opt-un [:rate-limiter/max-cached-tokens]))
:opt-un [:rate-limiter/max-cached-tokens
:rate-limiter/sleep-fn]))

(s/def :rate-limiter/ratelimiter #(satisfies? dr/IRateLimiter %))
(s/def :rate-limiter/max-wait-ms int?)
Expand Down
6 changes: 4 additions & 2 deletions test/diehard/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require [clojure.test :refer :all]
[diehard.circuit-breaker :as cb]
[diehard.core :refer :all])
(:import [dev.failsafe CircuitBreakerOpenException TimeoutExceededException]
(:import [clojure.lang IExceptionInfo]
[dev.failsafe CircuitBreakerOpenException TimeoutExceededException]
[java.util.concurrent CountDownLatch]))

(deftest test-retry
Expand Down Expand Up @@ -390,11 +391,12 @@
(let [counter0 (atom 0)]
(try
(while (< @counter0 200)
(with-rate-limiter {:ratelimiter my-rl
(with-rate-limiter {:ratelimiter my-rl3
:max-wait-ms 1}
(my-fn counter0)))
(is false)
(catch Exception e
(is (instance? IExceptionInfo e))
(is (:throttled (ex-data e))))))))

(testing "permits"
Expand Down
Loading