From d2065cbd33aea641c42fe533b57a1edf30cd9099 Mon Sep 17 00:00:00 2001 From: zxl Date: Fri, 17 Sep 2021 14:55:17 +0800 Subject: [PATCH] refactor: write attempt times --- server.go | 23 +++++++++++++++-------- session.go | 15 +++++---------- session_test.go | 2 +- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/server.go b/server.go index 4638afe..03caa80 100644 --- a/server.go +++ b/server.go @@ -34,7 +34,7 @@ type Server struct { printRoutes bool accepting chan struct{} stopped chan struct{} - writeRetryTimes int + writeAttemptTimes int } // ServerOption is the option for Server. @@ -49,16 +49,20 @@ type ServerOption struct { ReqQueueSize int // sets the request channel size of router, DefaultReqQueueSize will be used if < 0. RespQueueSize int // sets the response channel size of session, DefaultRespQueueSize will be used if < 0. DoNotPrintRoutes bool // whether to print registered route handlers to the console. - WriteRetryTimes int // sets the max retry times for packet writing in each session, should >= 0. + + // WriteAttemptTimes sets the max attempt times for packet writing in each session. + // The DefaultWriteAttemptTimes will be used if <= 0. + WriteAttemptTimes int } // ErrServerStopped is returned when server stopped. var ErrServerStopped = fmt.Errorf("server stopped") const ( - DefaultReqQueueSize = 1024 - DefaultRespQueueSize = 1024 - tempErrDelay = time.Millisecond * 5 + DefaultReqQueueSize = 1024 + DefaultRespQueueSize = 1024 + DefaultWriteAttemptTimes = 1 + tempErrDelay = time.Millisecond * 5 ) // NewServer creates a Server according to opt. @@ -72,6 +76,9 @@ func NewServer(opt *ServerOption) *Server { if opt.RespQueueSize < 0 { opt.RespQueueSize = DefaultReqQueueSize } + if opt.WriteAttemptTimes <= 0 { + opt.WriteAttemptTimes = DefaultWriteAttemptTimes + } return &Server{ socketReadBufferSize: opt.SocketReadBufferSize, socketWriteBufferSize: opt.SocketWriteBufferSize, @@ -85,7 +92,7 @@ func NewServer(opt *ServerOption) *Server { router: newRouter(opt.ReqQueueSize), accepting: make(chan struct{}), stopped: make(chan struct{}), - writeRetryTimes: opt.WriteRetryTimes, + writeAttemptTimes: opt.WriteAttemptTimes, } } @@ -168,8 +175,8 @@ func (s *Server) handleConn(conn net.Conn) { go s.OnSessionCreate(sess) } - go sess.readInbound(s.router.reqQueue, s.readTimeout) // start reading message packet from connection. - go sess.writeOutbound(s.writeTimeout, s.writeRetryTimes) // start writing message packet to connection. + go sess.readInbound(s.router.reqQueue, s.readTimeout) // start reading message packet from connection. + go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection. <-sess.closed // wait for session finished. Sessions().Remove(sess.ID()) // session has been closed, remove it. diff --git a/session.go b/session.go index bbae020..aa55314 100644 --- a/session.go +++ b/session.go @@ -123,7 +123,7 @@ func (s *Session) sendReq(ctx *Context, reqQueue chan<- *Context) (ok bool) { // writeOutbound fetches message from respQueue channel and writes to TCP connection in a loop. // Parameter writeTimeout specified the connection writing timeout. // The loop breaks if errors occurred, or the session is closed. -func (s *Session) writeOutbound(writeTimeout time.Duration, retryTimes int) { +func (s *Session) writeOutbound(writeTimeout time.Duration, attemptTimes int) { LOOP: for { select { @@ -152,7 +152,7 @@ LOOP: } } - if err := s.attemptConnWrite(outboundMsg, retryTimes); err != nil { + if err := s.attemptConnWrite(outboundMsg, attemptTimes); err != nil { Log.Errorf("session %s conn write err: %s", s.id, err) break LOOP } @@ -162,12 +162,7 @@ LOOP: Log.Tracef("session %s writeOutbound exit because of error", s.id) } -func (s *Session) attemptConnWrite(outboundMsg []byte, retryTimes int) error { - attemptTimes := 1 - if retryTimes > 0 { - attemptTimes += retryTimes - } - var err error +func (s *Session) attemptConnWrite(outboundMsg []byte, attemptTimes int) (err error) { for i := 0; i < attemptTimes; i++ { time.Sleep(tempErrDelay * time.Duration(i)) _, err = s.conn.Write(outboundMsg) @@ -189,9 +184,9 @@ func (s *Session) attemptConnWrite(outboundMsg []byte, retryTimes int) error { Log.Errorf("session %s conn write err: %s; retrying in %s", s.id, err, tempErrDelay*time.Duration(i+1)) continue } - break + break // if err is not temporary, break the loop. } - return err + return } func (s *Session) pack(ctx *Context) ([]byte, error) { diff --git a/session_test.go b/session_test.go index afa058d..c1d7b1a 100644 --- a/session_test.go +++ b/session_test.go @@ -390,7 +390,7 @@ func TestSession_attemptConnWrite_when_reach_last_try(t *testing.T) { conn.EXPECT().Write(gomock.Any()).Return(0, fmt.Errorf("some err")) s := newSession(conn, &SessionOption{}) - assert.Error(t, s.attemptConnWrite([]byte("whatever"), 0)) + assert.Error(t, s.attemptConnWrite([]byte("whatever"), 1)) } func TestSession_attemptConnWrite_when_err_is_not_temp(t *testing.T) {