diff --git a/client.go b/client.go index f33c7d04..860946be 100644 --- a/client.go +++ b/client.go @@ -392,8 +392,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { DEBUG.Println(CLI, "using custom onConnectAttempt handler...") tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } + // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) + conn, err = c.Options.OpenConnFn(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") diff --git a/options.go b/options.go index e745258d..e04cb024 100644 --- a/options.go +++ b/options.go @@ -57,6 +57,10 @@ type ReconnectHandler func(Client, *ClientOptions) // ConnectionAttemptHandler is invoked prior to making the initial connection. type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config +// OpenConnectionFunc is invoked to establish the underlying network connection. +// Does not carry out any MQTT specific handshakes. +type OpenConnectionFunc func(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy @@ -98,6 +102,7 @@ type ClientOptions struct { WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer + OpenConnFn OpenConnectionFunc } // NewClientOptions will create a new ClientClientOptions type with some @@ -140,10 +145,23 @@ func NewClientOptions() *ClientOptions { HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, + OpenConnFn: openConnection, } return o } + +// SetOpenConnectionFunc replaces the inbuilt function that establishes a connection with a custom function. +// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example) +func (o *ClientOptions) SetOpenConnectionFunc(fn OpenConnectionFunc) *ClientOptions { + o.OpenConnFn = fn + if o.OpenConnFn == nil { + o.OpenConnFn = openConnection //use the default one. + } + return o +} + + // AddBroker adds a broker URI to the list of brokers to be used. The format should be // scheme://host:port // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)