Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Server struct {
printRoutes bool
accepting chan struct{}
stopped chan struct{}
writeRetryTimes int
writeAttemptTimes int
}

// ServerOption is the option for Server.
Expand All @@ -49,16 +49,20 @@ type ServerOption struct {
ReqQueueSize int // sets the request channel size of router, DefaultReqQueueSize will be used if < 0.
RespQueueSize int // sets the response channel size of session, DefaultRespQueueSize will be used if < 0.
DoNotPrintRoutes bool // whether to print registered route handlers to the console.
WriteRetryTimes int // sets the max retry times for packet writing in each session, should >= 0.

// WriteAttemptTimes sets the max attempt times for packet writing in each session.
// The DefaultWriteAttemptTimes will be used if <= 0.
WriteAttemptTimes int
}

// ErrServerStopped is returned when server stopped.
var ErrServerStopped = fmt.Errorf("server stopped")

const (
DefaultReqQueueSize = 1024
DefaultRespQueueSize = 1024
tempErrDelay = time.Millisecond * 5
DefaultReqQueueSize = 1024
DefaultRespQueueSize = 1024
DefaultWriteAttemptTimes = 1
tempErrDelay = time.Millisecond * 5
)

// NewServer creates a Server according to opt.
Expand All @@ -72,6 +76,9 @@ func NewServer(opt *ServerOption) *Server {
if opt.RespQueueSize < 0 {
opt.RespQueueSize = DefaultReqQueueSize
}
if opt.WriteAttemptTimes <= 0 {
opt.WriteAttemptTimes = DefaultWriteAttemptTimes
}
return &Server{
socketReadBufferSize: opt.SocketReadBufferSize,
socketWriteBufferSize: opt.SocketWriteBufferSize,
Expand All @@ -85,7 +92,7 @@ func NewServer(opt *ServerOption) *Server {
router: newRouter(opt.ReqQueueSize),
accepting: make(chan struct{}),
stopped: make(chan struct{}),
writeRetryTimes: opt.WriteRetryTimes,
writeAttemptTimes: opt.WriteAttemptTimes,
}
}

Expand Down Expand Up @@ -168,8 +175,8 @@ func (s *Server) handleConn(conn net.Conn) {
go s.OnSessionCreate(sess)
}

go sess.readInbound(s.router.reqQueue, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout, s.writeRetryTimes) // start writing message packet to connection.
go sess.readInbound(s.router.reqQueue, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection.

<-sess.closed // wait for session finished.
Sessions().Remove(sess.ID()) // session has been closed, remove it.
Expand Down
15 changes: 5 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Session) sendReq(ctx *Context, reqQueue chan<- *Context) (ok bool) {
// writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
func (s *Session) writeOutbound(writeTimeout time.Duration, retryTimes int) {
func (s *Session) writeOutbound(writeTimeout time.Duration, attemptTimes int) {
LOOP:
for {
select {
Expand Down Expand Up @@ -152,7 +152,7 @@ LOOP:
}
}

if err := s.attemptConnWrite(outboundMsg, retryTimes); err != nil {
if err := s.attemptConnWrite(outboundMsg, attemptTimes); err != nil {
Log.Errorf("session %s conn write err: %s", s.id, err)
break LOOP
}
Expand All @@ -162,12 +162,7 @@ LOOP:
Log.Tracef("session %s writeOutbound exit because of error", s.id)
}

func (s *Session) attemptConnWrite(outboundMsg []byte, retryTimes int) error {
attemptTimes := 1
if retryTimes > 0 {
attemptTimes += retryTimes
}
var err error
func (s *Session) attemptConnWrite(outboundMsg []byte, attemptTimes int) (err error) {
for i := 0; i < attemptTimes; i++ {
time.Sleep(tempErrDelay * time.Duration(i))
_, err = s.conn.Write(outboundMsg)
Expand All @@ -189,9 +184,9 @@ func (s *Session) attemptConnWrite(outboundMsg []byte, retryTimes int) error {
Log.Errorf("session %s conn write err: %s; retrying in %s", s.id, err, tempErrDelay*time.Duration(i+1))
continue
}
break
break // if err is not temporary, break the loop.
}
return err
return
}

func (s *Session) pack(ctx *Context) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func TestSession_attemptConnWrite_when_reach_last_try(t *testing.T) {
conn.EXPECT().Write(gomock.Any()).Return(0, fmt.Errorf("some err"))

s := newSession(conn, &SessionOption{})
assert.Error(t, s.attemptConnWrite([]byte("whatever"), 0))
assert.Error(t, s.attemptConnWrite([]byte("whatever"), 1))
}

func TestSession_attemptConnWrite_when_err_is_not_temp(t *testing.T) {
Expand Down