Skip to content
Merged
25 changes: 18 additions & 7 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
RequestCancellationException
RequestTimeoutException)
(io.aleph.dirigiste Pools)
(io.netty.channel ConnectTimeoutException)
(io.netty.handler.codec Headers)
(io.netty.handler.codec.http HttpHeaders)
(java.net
Expand Down Expand Up @@ -157,6 +158,7 @@
| `insecure?` | if `true`, ignores the certificate for any `https://` domains
| `response-buffer-size` | the amount of the response, in bytes, that is buffered before the request returns, defaults to `65536`. This does *not* represent the maximum size response that the client can handle (which is unbounded), and is only a means of maximizing performance.
| `keep-alive?` | if `true`, attempts to reuse connections for multiple requests, defaults to `true`.
| `connect-timeout` | timeout for a connection to be established, in milliseconds. Default determined by Netty, see `aleph.netty/default-connect-timeout`.
| `idle-timeout` | when set, forces keep-alive connections to be closed after an idle time, in milliseconds.
| `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`).
| `raw-stream?` | if `true`, bodies of responses will not be buffered at all, and represented as Manifold streams of `io.netty.buffer.ByteBuf` objects rather than as an `InputStream`. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users.
Expand Down Expand Up @@ -275,6 +277,7 @@
| `pipeline-transform` | an optional function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
| `max-frame-payload` | maximum allowable frame payload length, in bytes, defaults to `65536`.
| `max-frame-size` | maximum aggregate message size, in bytes, defaults to `1048576`.
| `connect-timeout` | timeout for a connection to be established, in milliseconds. Default determined by Netty, see `aleph.netty/default-connect-timeout`.
| `bootstrap-transform` | an optional function that takes an `io.netty.bootstrap.Bootstrap` object and modifies it.
| `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`).
| `heartbeats` | optional configuration to send Ping frames to the server periodically (if the connection is idle), configuration keys are `:send-after-idle` (in milliseconds), `:payload` (optional, empty frame by default) and `:timeout` (optional, to close the connection if Pong is not received after specified timeout)."
Expand Down Expand Up @@ -342,7 +345,7 @@

Param key | Description
-------------------- | -----------------------------------------------------------------------------------------------------------------------------------------------------------------
`connection-timeout` | timeout in milliseconds for the connection to become established
`connection-timeout` | timeout in milliseconds for the connection to become established, defaults to `aleph.netty/default-connect-timeout`. Note that this timeout will be ineffective if the pool's `connect-timeout` is lower.
`follow-redirects?` | whether to follow redirects, defaults to `true`; see `aleph.http.client-middleware/handle-redirects`
`middleware` | custom client middleware for the request
`pool-timeout` | timeout in milliseconds for the pool to generate a connection
Expand All @@ -360,7 +363,7 @@
:or {pool default-connection-pool
response-executor default-response-executor
middleware identity
connection-timeout 6e4} ;; 60 seconds
connection-timeout aleph.netty/default-connect-timeout}
:as req}]
(let [dispose-conn! (atom (fn []))
result (d/deferred response-executor)
Expand Down Expand Up @@ -394,10 +397,18 @@
(-> (first conn)
(maybe-timeout! connection-timeout)

;; connection timeout triggered
(d/catch' TimeoutException
(fn [^Throwable e]
(d/error-deferred (ConnectionTimeoutException. e))))
;; connection establishment failed
(d/catch'
(fn [e]
(if (or (instance? TimeoutException e)
;; Unintuitively, this type doesn't inherit from TimeoutException
(instance? ConnectTimeoutException e))
(do
(log/trace "Timed out waiting for connection to be established")
(d/error-deferred (ConnectionTimeoutException. ^Throwable e)))
(do
(log/trace "Connection establishment failed")
(d/error-deferred e)))))

;; actually make the request now
(d/chain'
Expand Down Expand Up @@ -496,7 +507,7 @@
| `follow-redirects?` | whether to follow redirects, defaults to `true`; see `aleph.http.client-middleware/handle-redirects`
| `pool` | a custom connection pool
| `pool-timeout` | timeout in milliseconds for the pool to generate a connection
| `connection-timeout` | timeout in milliseconds for the connection to become established
`connection-timeout` | timeout in milliseconds for the connection to become established, defaults to `aleph.netty/default-connect-timeout`. Note that this timeout will be ineffective if the pool's `connect-timeout` is lower.
| `request-timeout` | timeout in milliseconds for the arrival of a response over the established connection
| `read-timeout` | timeout in milliseconds for the response to be completed
| `response-executor` | optional `java.util.concurrent.Executor` that will handle the requests (defaults to a `flow/utilization-executor` of 256 `max-threads` and a `queue-length` of 0)")
Expand Down
7 changes: 5 additions & 2 deletions src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -770,13 +770,15 @@
log-activity
http-versions
force-h2c?
on-closed]
on-closed
connect-timeout]
:or {raw-stream? false
bootstrap-transform identity
pipeline-transform identity
keep-alive? true
ssl-endpoint-id-alg netty/default-ssl-endpoint-id-alg
response-buffer-size 65536
connect-timeout netty/default-connect-timeout
epoll? false
name-resolver :default
log-activity :debug
Expand Down Expand Up @@ -818,7 +820,8 @@
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)
:name-resolver name-resolver})]
:name-resolver name-resolver
:connect-timeout connect-timeout})]

(attach-on-close-handler ch-d on-closed)

Expand Down
9 changes: 6 additions & 3 deletions src/aleph/http/websocket/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@
max-frame-payload
max-frame-size
compression?
heartbeats]
heartbeats
connect-timeout]
:or {bootstrap-transform identity
pipeline-transform identity
raw-stream? false
Expand All @@ -254,7 +255,8 @@
extensions? false
max-frame-payload 65536
max-frame-size 1048576
compression? false}
compression? false
connect-timeout netty/default-connect-timeout}
:as options}]

(when (and (true? (:compression? options))
Expand Down Expand Up @@ -312,5 +314,6 @@
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)})
:transport (netty/determine-transport transport epoll?)
:connect-timeout connect-timeout})
(fn [_] s))))
14 changes: 11 additions & 3 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,15 @@

;;; Defaults defined here since they are not publically exposed by Netty

(def ^:const ^:no-doc default-shutdown-timeout
"Default timeout in seconds to wait for graceful shutdown complete"
(def ^:const default-shutdown-timeout
"Netty's default timeout in seconds to wait for graceful shutdown complete"
15)

(def ^:const default-connect-timeout
"Netty's default connect timeout in milliseconds."
;; io.netty.channel.DefaultChannelConfig/DEFAULT_CONNECT_TIMEOUT
30000)

(def ^:const ^:no-doc byte-array-class (Class/forName "[B"))

#_
Expand Down Expand Up @@ -1527,7 +1532,9 @@
^SocketAddress remote-address
^SocketAddress local-address
transport
name-resolver]
name-resolver
connect-timeout]
:or {connect-timeout default-connect-timeout}
:as opts}]
(ensure-transport-available! transport)

Expand All @@ -1546,6 +1553,7 @@
(instance? AddressResolverGroup name-resolver) name-resolver))
bootstrap (doto (Bootstrap.)
(.option ChannelOption/SO_REUSEADDR true)
(.option ChannelOption/CONNECT_TIMEOUT_MILLIS (int connect-timeout))
#_(.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) ; option deprecated, removed in v5
(.group client-event-loop-group)
(.channel chan-class)
Expand Down
19 changes: 17 additions & 2 deletions src/aleph/tcp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,26 @@
| `ssl-endpoint-id-alg` | the name of the algorithm to use for SSL endpoint identification (see https://docs.oracle.com/en/java/javase/17/docs/specs/security/standard-names.html#endpoint-identification-algorithms), defaults to \"HTTPS\" which is a reasonable default for non-HTTPS uses, too. Only used by SSL connections. Pass `nil` to disable endpoint identification.
| `ssl?` | if true, the client attempts to establish a secure connection with the server.
| `insecure?` | if true, the client will ignore the server's certificate.
| `connect-timeout` | timeout for a connection to be established, in milliseconds. Default determined by Netty, see `aleph.netty/default-connect-timeout`.
| `bootstrap-transform` | a function that takes an `io.netty.bootstrap.Bootstrap` object, which represents the client, and modifies it.
| `pipeline-transform` | a function that takes an `io.netty.channel.ChannelPipeline` object, which represents a connection, and modifies it.
| `raw-stream?` | if true, messages from the stream will be `io.netty.buffer.ByteBuf` objects rather than byte-arrays. This will minimize copying, but means that care must be taken with Netty's buffer reference counting. Only recommended for advanced users.
| `transport` | the transport to use, one of `:nio`, `:epoll`, `:kqueue` or `:io-uring` (defaults to `:nio`)."
[{:keys [host port remote-address local-address ssl-context ssl-endpoint-id-alg ssl? insecure? pipeline-transform bootstrap-transform epoll? transport]
[{:keys [host
port
remote-address
local-address
ssl-context
ssl-endpoint-id-alg
ssl?
insecure?
connect-timeout
pipeline-transform
bootstrap-transform
epoll?
transport]
:or {ssl-endpoint-id-alg netty/default-ssl-endpoint-id-alg
connect-timeout netty/default-connect-timeout
bootstrap-transform identity
epoll? false}
:as options}]
Expand All @@ -196,6 +210,7 @@
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)})
:transport (netty/determine-transport transport epoll?)
:connect-timeout connect-timeout})
(d/catch' #(d/error! s %)))
s))
11 changes: 11 additions & 0 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,17 @@
@(http/get "http://192.0.2.0" ;; "TEST-NET" in RFC 5737
(merge (default-request-options) {:pool *pool* :connection-timeout 2}))))))


(deftest test-pool-connect-timeout
(binding [*connection-options* {:connect-timeout 2}]
(with-handler basic-handler
(is (thrown? ConnectionTimeoutException
(deref (http/get "http://192.0.2.0" ;; "TEST-NET" in RFC 5737
(merge (default-request-options) {:pool *pool*
:connection-timeout 500}))
1e3
:timeout))))))

(deftest test-request-timeout
(with-handler basic-handler
(is (thrown? TimeoutException
Expand Down