Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"errors"
"fmt"
"math/big"
"net"
"net/http"
"regexp"
"runtime"
"strconv"
Expand All @@ -41,7 +41,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/websocket"
"github.com/gorilla/websocket"
)

const (
Expand Down Expand Up @@ -200,21 +200,21 @@ func (s *Service) loop() {
path := fmt.Sprintf("%s/api", s.host)
urls := []string{path}

if !strings.Contains(path, "://") { // url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
// url.Parse and url.IsAbs is unsuitable (https://github.com/golang/go/issues/19779)
if !strings.Contains(path, "://") {
urls = []string{"wss://" + path, "ws://" + path}
}
// Establish a websocket connection to the server on any supported URL
var (
conf *websocket.Config
conn *websocket.Conn
err error
)
dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second}
header := make(http.Header)
header.Set("origin", "http://localhost")
for _, url := range urls {
if conf, err = websocket.NewConfig(url, "http://localhost/"); err != nil {
continue
}
conf.Dialer = &net.Dialer{Timeout: 5 * time.Second}
if conn, err = websocket.DialConfig(conf); err == nil {
conn, _, err = dialer.Dial(url, header)
if err == nil {
break
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *Service) readLoop(conn *websocket.Conn) {
for {
// Retrieve the next generic network packet and bail out on error
var msg map[string][]interface{}
if err := websocket.JSON.Receive(conn, &msg); err != nil {
if err := conn.ReadJSON(&msg); err != nil {
log.Warn("Failed to decode stats server message", "err", err)
return
}
Expand Down Expand Up @@ -399,12 +399,12 @@ func (s *Service) login(conn *websocket.Conn) error {
login := map[string][]interface{}{
"emit": {"hello", auth},
}
if err := websocket.JSON.Send(conn, login); err != nil {
if err := conn.WriteJSON(login); err != nil {
return err
}
// Retrieve the remote ack or connection termination
var ack map[string][]string
if err := websocket.JSON.Receive(conn, &ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" {
if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" {
return errors.New("unauthorized")
}
return nil
Expand Down Expand Up @@ -441,7 +441,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
"clientTime": start.String(),
}},
}
if err := websocket.JSON.Send(conn, ping); err != nil {
if err := conn.WriteJSON(ping); err != nil {
return err
}
// Wait for the pong request to arrive back
Expand All @@ -463,7 +463,7 @@ func (s *Service) reportLatency(conn *websocket.Conn) error {
"latency": latency,
}},
}
return websocket.JSON.Send(conn, stats)
return conn.WriteJSON(stats)
}

// blockStats is the information to report about individual blocks.
Expand Down Expand Up @@ -514,7 +514,7 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
report := map[string][]interface{}{
"emit": {"block", stats},
}
return websocket.JSON.Send(conn, report)
return conn.WriteJSON(report)
}

// assembleBlockStats retrieves any required metadata to report a single block
Expand Down Expand Up @@ -628,7 +628,7 @@ func (s *Service) reportHistory(conn *websocket.Conn, list []uint64) error {
report := map[string][]interface{}{
"emit": {"history", stats},
}
return websocket.JSON.Send(conn, report)
return conn.WriteJSON(report)
}

// pendStats is the information to report about pending transactions.
Expand Down Expand Up @@ -658,7 +658,7 @@ func (s *Service) reportPending(conn *websocket.Conn) error {
report := map[string][]interface{}{
"emit": {"pending", stats},
}
return websocket.JSON.Send(conn, report)
return conn.WriteJSON(report)
}

// nodeStats is the information to report about the local node.
Expand Down Expand Up @@ -713,5 +713,5 @@ func (s *Service) reportStats(conn *websocket.Conn) error {
report := map[string][]interface{}{
"emit": {"stats", stats},
}
return websocket.JSON.Send(conn, report)
return conn.WriteJSON(report)
}
5 changes: 2 additions & 3 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ var (

const (
// Timeouts
tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
)

const (
Expand Down
2 changes: 1 addition & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

const (
maxRequestContentLength = 1024 * 512
maxRequestContentLength = 1024 * 1024 * 5
contentType = "application/json"
)

Expand Down
2 changes: 1 addition & 1 deletion rpc/ipc_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func ipcListen(endpoint string) (net.Listener, error) {

// newIPCConnection will connect to a Unix socket on the given endpoint.
func newIPCConnection(ctx context.Context, endpoint string) (net.Conn, error) {
return dialContext(ctx, "unix", endpoint)
return new(net.Dialer).DialContext(ctx, "unix", endpoint)
}
17 changes: 11 additions & 6 deletions rpc/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ type Conn interface {
SetWriteDeadline(time.Time) error
}

type deadlineCloser interface {
io.Closer
SetWriteDeadline(time.Time) error
}

// ConnRemoteAddr wraps the RemoteAddr operation, which returns a description
// of the peer address of a connection. If a Conn also implements ConnRemoteAddr, this
// description is used in log messages.
Expand All @@ -165,12 +170,10 @@ type jsonCodec struct {
decode func(v interface{}) error // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode func(v interface{}) error // encoder to allow multiple transports
conn Conn
conn deadlineCloser
}

// NewCodec creates a new RPC server codec with support for JSON-RPC 2.0 based
// on explicitly given encoding and decoding methods.
func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec {
func newCodec(conn deadlineCloser, encode, decode func(v interface{}) error) ServerCodec {
codec := &jsonCodec{
closed: make(chan interface{}),
encode: encode,
Expand All @@ -183,12 +186,14 @@ func NewCodec(conn Conn, encode, decode func(v interface{}) error) ServerCodec {
return codec
}

// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0.
// NewJSONCodec creates a codec that reads from the given connection. If conn implements
// ConnRemoteAddr, log messages will use it to include the remote address of the
// connection.
func NewJSONCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
return NewCodec(conn, enc.Encode, dec.Decode)
return newCodec(conn, enc.Encode, dec.Decode)
}

func (c *jsonCodec) RemoteAddr() string {
Expand Down
Loading