Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const (
maxClientSubscriptionBuffer = 20000
)

const (
HTTPConn = "http"
WSConn = "ws"
IPCConn = "ipc"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaict, this is only used in the rpc package

Suggested change
HTTPConn = "http"
WSConn = "ws"
IPCConn = "ipc"
httpConn = "http"
wsConn = "ws"
ipcConn = "ipc"

)

// BatchElem is an element in a batch request.
type BatchElem struct {
Method string
Expand All @@ -75,7 +81,7 @@ type BatchElem struct {
// Client represents a connection to an RPC server.
type Client struct {
idgen func() ID // for subscriptions
isHTTP bool
scheme string // connection type: http, ws or ipc
services *serviceRegistry

idCounter uint32
Expand Down Expand Up @@ -111,6 +117,10 @@ type clientConn struct {

func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c)
// Http connections have already set the scheme
if c.scheme != HTTPConn {
ctx = context.WithValue(ctx, "scheme", c.scheme)
}
handler := newHandler(ctx, conn, c.idgen, c.services)
return &clientConn{conn, handler}
}
Expand All @@ -136,7 +146,7 @@ func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, erro
select {
case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP {
if c.scheme != HTTPConn {
select {
case c.reqTimeout <- op:
case <-c.closing:
Expand Down Expand Up @@ -197,16 +207,29 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error)
if err != nil {
return nil, err
}
c := initClient(conn, randomIDGenerator(), new(serviceRegistry))
c, err := initClient(conn, randomIDGenerator(), new(serviceRegistry))
if err != nil {
return nil, err
}
c.reconnectFunc = connect
return c, nil
}

func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
_, isHTTP := conn.(*httpConn)
func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) (*Client, error) {
var scheme string
switch conn.(type) {
case *httpConn:
scheme = HTTPConn
case *websocketCodec:
scheme = WSConn
case *jsonCodec:
scheme = IPCConn
default:
return nil, errors.New("Unknown connection scheme")
}
c := &Client{
idgen: idgen,
isHTTP: isHTTP,
scheme: scheme,
services: services,
writeConn: conn,
close: make(chan struct{}),
Expand All @@ -219,10 +242,10 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C
reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp),
}
if !isHTTP {
if scheme != HTTPConn {
go c.dispatch(conn)
}
return c
return c, nil
}

// RegisterName creates a service for the given receiver type under the given name. When no
Expand Down Expand Up @@ -250,7 +273,7 @@ func (c *Client) SupportedModules() (map[string]string, error) {

// Close closes the client, aborting any in-flight requests.
func (c *Client) Close() {
if c.isHTTP {
if c.scheme == HTTPConn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative change would have been to hide this internaly, so instead of c.isHTTP {, you would have c.isHTTP() {

return
}
select {
Expand All @@ -264,7 +287,7 @@ func (c *Client) Close() {
// This method only works for clients using HTTP, it doesn't have
// any effect for clients using another transport.
func (c *Client) SetHeader(key, value string) {
if !c.isHTTP {
if c.scheme != HTTPConn {
return
}
conn := c.writeConn.(*httpConn)
Expand Down Expand Up @@ -298,7 +321,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
}
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}

if c.isHTTP {
if c.scheme == HTTPConn {
err = c.sendHTTP(ctx, op, msg)
} else {
err = c.send(ctx, op, msg)
Expand Down Expand Up @@ -357,7 +380,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
}

var err error
if c.isHTTP {
if c.scheme == HTTPConn {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
Expand Down Expand Up @@ -402,7 +425,7 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{})
}
msg.ID = nil

if c.isHTTP {
if c.scheme == HTTPConn {
return c.sendHTTP(ctx, op, msg)
}
return c.send(ctx, op, msg)
Expand Down Expand Up @@ -440,7 +463,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf
if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil")
}
if c.isHTTP {
if c.scheme == HTTPConn {
return nil, ErrNotificationsUnsupported
}

Expand Down
5 changes: 4 additions & 1 deletion rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
s.codecs.Add(codec)
defer s.codecs.Remove(codec)

c := initClient(codec, s.idgen, &s.services)
c, err := initClient(codec, s.idgen, &s.services)
if err != nil {
panic("failed to serve codec")
}
<-codec.closed()
c.Close()
}
Expand Down