@@ -58,12 +58,6 @@ const (
5858 maxClientSubscriptionBuffer = 20000
5959)
6060
61- const (
62- httpScheme = "http"
63- wsScheme = "ws"
64- ipcScheme = "ipc"
65- )
66-
6761// BatchElem is an element in a batch request.
6862type BatchElem struct {
6963 Method string
@@ -80,7 +74,7 @@ type BatchElem struct {
8074// Client represents a connection to an RPC server.
8175type Client struct {
8276 idgen func () ID // for subscriptions
83- scheme string // connection type: http, ws or ipc
77+ isHTTP bool // connection type: http, ws or ipc
8478 services * serviceRegistry
8579
8680 idCounter uint32
@@ -115,11 +109,9 @@ type clientConn struct {
115109}
116110
117111func (c * Client ) newClientConn (conn ServerCodec ) * clientConn {
118- ctx := context .WithValue (context .Background (), clientContextKey {}, c )
119- // Http connections have already set the scheme
120- if ! c .isHTTP () && c .scheme != "" {
121- ctx = context .WithValue (ctx , "scheme" , c .scheme )
122- }
112+ ctx := context .Background ()
113+ ctx = context .WithValue (ctx , clientContextKey {}, c )
114+ ctx = context .WithValue (ctx , peerInfoContextKey {}, conn .peerInfo ())
123115 handler := newHandler (ctx , conn , c .idgen , c .services )
124116 return & clientConn {conn , handler }
125117}
@@ -145,7 +137,7 @@ func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, erro
145137 select {
146138 case <- ctx .Done ():
147139 // Send the timeout to dispatch so it can remove the request IDs.
148- if ! c .isHTTP () {
140+ if ! c .isHTTP {
149141 select {
150142 case c .reqTimeout <- op :
151143 case <- c .closing :
@@ -212,18 +204,10 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error)
212204}
213205
214206func initClient (conn ServerCodec , idgen func () ID , services * serviceRegistry ) * Client {
215- scheme := ""
216- switch conn .(type ) {
217- case * httpConn :
218- scheme = httpScheme
219- case * websocketCodec :
220- scheme = wsScheme
221- case * jsonCodec :
222- scheme = ipcScheme
223- }
207+ _ , isHTTP := conn .(* httpConn )
224208 c := & Client {
209+ isHTTP : isHTTP ,
225210 idgen : idgen ,
226- scheme : scheme ,
227211 services : services ,
228212 writeConn : conn ,
229213 close : make (chan struct {}),
@@ -236,7 +220,7 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C
236220 reqSent : make (chan error , 1 ),
237221 reqTimeout : make (chan * requestOp ),
238222 }
239- if ! c . isHTTP () {
223+ if ! isHTTP {
240224 go c .dispatch (conn )
241225 }
242226 return c
@@ -267,7 +251,7 @@ func (c *Client) SupportedModules() (map[string]string, error) {
267251
268252// Close closes the client, aborting any in-flight requests.
269253func (c * Client ) Close () {
270- if c .isHTTP () {
254+ if c .isHTTP {
271255 return
272256 }
273257 select {
@@ -281,7 +265,7 @@ func (c *Client) Close() {
281265// This method only works for clients using HTTP, it doesn't have
282266// any effect for clients using another transport.
283267func (c * Client ) SetHeader (key , value string ) {
284- if ! c .isHTTP () {
268+ if ! c .isHTTP {
285269 return
286270 }
287271 conn := c .writeConn .(* httpConn )
@@ -315,7 +299,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
315299 }
316300 op := & requestOp {ids : []json.RawMessage {msg .ID }, resp : make (chan * jsonrpcMessage , 1 )}
317301
318- if c .isHTTP () {
302+ if c .isHTTP {
319303 err = c .sendHTTP (ctx , op , msg )
320304 } else {
321305 err = c .send (ctx , op , msg )
@@ -378,7 +362,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
378362 }
379363
380364 var err error
381- if c .isHTTP () {
365+ if c .isHTTP {
382366 err = c .sendBatchHTTP (ctx , op , msgs )
383367 } else {
384368 err = c .send (ctx , op , msgs )
@@ -417,7 +401,7 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{})
417401 }
418402 msg .ID = nil
419403
420- if c .isHTTP () {
404+ if c .isHTTP {
421405 return c .sendHTTP (ctx , op , msg )
422406 }
423407 return c .send (ctx , op , msg )
@@ -450,12 +434,12 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf
450434 // Check type of channel first.
451435 chanVal := reflect .ValueOf (channel )
452436 if chanVal .Kind () != reflect .Chan || chanVal .Type ().ChanDir ()& reflect .SendDir == 0 {
453- panic ("first argument to Subscribe must be a writable channel" )
437+ panic (fmt . Sprintf ( "channel argument of Subscribe has type %T, need writable channel", channel ) )
454438 }
455439 if chanVal .IsNil () {
456440 panic ("channel given to Subscribe must not be nil" )
457441 }
458- if c .isHTTP () {
442+ if c .isHTTP {
459443 return nil , ErrNotificationsUnsupported
460444 }
461445
@@ -509,8 +493,8 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
509493}
510494
511495func (c * Client ) write (ctx context.Context , msg interface {}, retry bool ) error {
512- // The previous write failed. Try to establish a new connection.
513496 if c .writeConn == nil {
497+ // The previous write failed. Try to establish a new connection.
514498 if err := c .reconnect (ctx ); err != nil {
515499 return err
516500 }
@@ -657,7 +641,3 @@ func (c *Client) read(codec ServerCodec) {
657641 c .readOp <- readOp {msgs , batch }
658642 }
659643}
660-
661- func (c * Client ) isHTTP () bool {
662- return c .scheme == httpScheme
663- }
0 commit comments