From 0cabf8623d3fe2433e156306ae2510305345e0dd Mon Sep 17 00:00:00 2001 From: zxl Date: Wed, 5 Jan 2022 14:56:05 +0800 Subject: [PATCH 1/3] refactor: remove the built-in session manager --- examples/tcp/broadcast/server/main.go | 52 +++++++++++++---- examples/tcp/simple/server/main.go | 13 +++-- server.go | 21 ++----- session.go | 23 +++++--- session_manager.go | 67 --------------------- session_manager_test.go | 84 --------------------------- session_test.go | 8 +++ 7 files changed, 78 insertions(+), 190 deletions(-) delete mode 100644 session_manager.go delete mode 100644 session_manager_test.go diff --git a/examples/tcp/broadcast/server/main.go b/examples/tcp/broadcast/server/main.go index 5e04bef..d421826 100644 --- a/examples/tcp/broadcast/server/main.go +++ b/examples/tcp/broadcast/server/main.go @@ -9,14 +9,23 @@ import ( "github.com/sirupsen/logrus" "os" "os/signal" + "sync" "syscall" "time" ) var log *logrus.Logger +var sessions *SessionManager func init() { log = logrus.New() + sessions = &SessionManager{nextId: 1, storage: map[int64]easytcp.Session{}} +} + +type SessionManager struct { + nextId int64 + lock sync.Mutex + storage map[int64]easytcp.Session } func main() { @@ -24,23 +33,42 @@ func main() { Packer: easytcp.NewDefaultPacker(), }) + s.OnSessionCreate = func(sess easytcp.Session) { + // store session + sessions.lock.Lock() + defer sessions.lock.Unlock() + sess.SetID(sessions.nextId) + sessions.nextId++ + sessions.storage[sess.ID().(int64)] = sess + } + + s.OnSessionClose = func(sess easytcp.Session) { + // remove session + delete(sessions.storage, sess.ID().(int64)) + } + s.Use(fixture.RecoverMiddleware(log), logMiddleware) s.AddRoute(common.MsgIdBroadCastReq, func(ctx easytcp.Context) { reqData := ctx.Request().Data - // broadcasting - go easytcp.Sessions().Range(func(id string, sess easytcp.Session) (next bool) { - if ctx.Session().ID() == id { - return true // next iteration + // broadcasting to other sessions + currentSession := ctx.Session() + for _, sess := range sessions.storage { + targetSession := sess + if currentSession.ID() == targetSession.ID() { + continue } - respData := fmt.Sprintf("%s (broadcast from %s)", reqData, ctx.Session().ID()) - ctx.Copy().SetResponseMessage(&message.Entry{ - ID: common.MsgIdBroadCastAck, - Data: []byte(respData), - }).SendTo(sess) - return true - }) + respData := fmt.Sprintf("%s (broadcast from %d to %d)", reqData, currentSession.ID(), targetSession.ID()) + respEntry := &message.Entry{ID: common.MsgIdBroadCastAck, Data: []byte(respData)} + go func() { + targetSession.AllocateContext().SetResponseMessage(respEntry).Send() + // can also write like this. + // ctx.Copy().SetResponseMessage(respEntry).SendTo(targetSession) + // or this. + // ctx.Copy().SetSession(targetSession).SetResponseMessage(respEntry).Send() + }() + } ctx.SetResponseMessage(&message.Entry{ ID: common.MsgIdBroadCastAck, @@ -68,7 +96,7 @@ func logMiddleware(next easytcp.HandlerFunc) easytcp.HandlerFunc { log.Infof("recv request | %s", ctx.Request().Data) defer func() { var resp = ctx.Response() - log.Infof("send response | id: %d; size: %d; data: %s", resp.ID, len(resp.Data), resp.Data) + log.Infof("send response |sessId: %d; id: %d; size: %d; data: %s", ctx.Session().ID(), resp.ID, len(resp.Data), resp.Data) }() next(ctx) } diff --git a/examples/tcp/simple/server/main.go b/examples/tcp/simple/server/main.go index 5f8160a..72640db 100644 --- a/examples/tcp/simple/server/main.go +++ b/examples/tcp/simple/server/main.go @@ -18,11 +18,13 @@ var log *logrus.Logger func init() { log = logrus.New() + log.SetLevel(logrus.TraceLevel) } func main() { // go printGoroutineNum() + easytcp.SetLogger(log) s := easytcp.NewServer(&easytcp.ServerOption{ SocketReadBufferSize: 1024 * 1024, SocketWriteBufferSize: 1024 * 1024, @@ -33,10 +35,10 @@ func main() { Codec: nil, }) s.OnSessionCreate = func(sess easytcp.Session) { - log.Infof("session created: %s", sess.ID()) + log.Infof("session created: %v", sess.ID()) } s.OnSessionClose = func(sess easytcp.Session) { - log.Warnf("session closed: %s", sess.ID()) + log.Warnf("session closed: %v", sess.ID()) } // register global middlewares @@ -62,15 +64,16 @@ func main() { if err := s.Stop(); err != nil { log.Errorf("server stopped err: %s", err) } - time.Sleep(time.Second) + time.Sleep(time.Second * 3) } func logMiddleware(next easytcp.HandlerFunc) easytcp.HandlerFunc { return func(c easytcp.Context) { - log.Infof("rec <<< | id:(%d) size:(%d) data: %s", c.Request().ID, len(c.Request().Data), c.Request().Data) + req := c.Request() + log.Infof("rec <<< id:(%d) size:(%d) data: %s", req.ID, len(req.Data), req.Data) defer func() { resp := c.Response() - log.Infof("snd >>> | id:(%d) size:(%d) data: %s", resp.ID, len(resp.Data), resp.Data) + log.Infof("snd >>> id:(%d) size:(%d) data: %s", resp.ID, len(resp.Data), resp.Data) }() next(c) } diff --git a/server.go b/server.go index 5fca3e1..d12682e 100644 --- a/server.go +++ b/server.go @@ -153,14 +153,13 @@ func (s *Server) acceptLoop() error { // handleConn creates a new session with conn, // handles the message through the session in different goroutines, -// and waits until the session's closed. +// and waits until the session's closed, then close the conn. func (s *Server) handleConn(conn net.Conn) { sess := newSession(conn, &sessionOption{ Packer: s.Packer, Codec: s.Codec, respQueueSize: s.respQueueSize, }) - Sessions().Add(sess) if s.OnSessionCreate != nil { go s.OnSessionCreate(sess) } @@ -168,8 +167,10 @@ func (s *Server) handleConn(conn net.Conn) { go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection. go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection. - <-sess.closed // wait for session finished. - Sessions().Remove(sess.ID()) // session has been closed, remove it. + select { + case <-sess.closed: // wait for session finished. + case <-s.stopped: // or the server is stopped. + } if s.OnSessionClose != nil { go s.OnSessionClose(sess) @@ -179,19 +180,9 @@ func (s *Server) handleConn(conn net.Conn) { } } -// Stop stops server by closing all the TCP sessions, listener and the router. +// Stop stops server. Closing Listener and all connections. func (s *Server) Stop() error { close(s.stopped) - - // close all sessions - closedNum := 0 - Sessions().Range(func(id string, sess Session) (next bool) { - sess.Close() - closedNum++ - return true - }) - Log.Tracef("%d session(s) closed", closedNum) - return s.Listener.Close() } diff --git a/session.go b/session.go index 9036911..e9bfa97 100644 --- a/session.go +++ b/session.go @@ -10,7 +10,10 @@ import ( // Session represents a TCP session. type Session interface { // ID returns current session's id. - ID() string + ID() interface{} + + // SetID sets current session's id. + SetID(id interface{}) // Send sends the ctx to the respQueue. Send(ctx Context) bool @@ -26,10 +29,10 @@ type Session interface { } type session struct { - id string // session's ID. it's a UUID + id interface{} // session's ID. conn net.Conn // tcp connection closed chan struct{} // to close() - closeOne sync.Once // ensure one session only close once + closeOnce sync.Once // ensure one session only close once respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound() packer Packer // to pack and unpack message codec Codec // encode/decode message data @@ -49,7 +52,7 @@ type sessionOption struct { // Returns a session pointer. func newSession(conn net.Conn, opt *sessionOption) *session { return &session{ - id: uuid.NewString(), + id: uuid.NewString(), // use uuid as default conn: conn, closed: make(chan struct{}), respQueue: make(chan Context, opt.respQueueSize), @@ -59,11 +62,17 @@ func newSession(conn net.Conn, opt *sessionOption) *session { } } -// ID returns the session's ID. -func (s *session) ID() string { +// ID returns the session's id. +func (s *session) ID() interface{} { return s.id } +// SetID sets session id. +// Can be called in server.OnSessionCreate() callback. +func (s *session) SetID(id interface{}) { + s.id = id +} + // Send pushes response message entry to respQueue. // Returns error if session is closed. func (s *session) Send(ctx Context) (ok bool) { @@ -85,7 +94,7 @@ func (s *session) Codec() Codec { // Close closes the session, but doesn't close the connection. // The connection will be closed in the server once the session's closed. func (s *session) Close() { - s.closeOne.Do(func() { close(s.closed) }) + s.closeOnce.Do(func() { close(s.closed) }) } // AllocateContext gets a Context from pool and reset all but session. diff --git a/session_manager.go b/session_manager.go deleted file mode 100644 index 6dfa1e6..0000000 --- a/session_manager.go +++ /dev/null @@ -1,67 +0,0 @@ -package easytcp - -import ( - "sync" -) - -var ( - managerOnce sync.Once - manager *SessionManager -) - -// SessionManager manages all the sessions in application runtime. -type SessionManager struct { - mu sync.RWMutex - sessions map[string]Session -} - -// Sessions returns a SessionManager pointer in a singleton way. -func Sessions() *SessionManager { - managerOnce.Do(func() { - manager = &SessionManager{} - }) - return manager -} - -// Add adds a session to sessions. -// If the ID of s already existed in sessions, it replaces the value with the s. -func (m *SessionManager) Add(s Session) { - if s == nil { - return - } - m.mu.Lock() - if m.sessions == nil { - m.sessions = make(map[string]Session) - } - m.sessions[s.ID()] = s - m.mu.Unlock() -} - -// Remove removes a session from sessions. -// Parameter id should be the session's id. -func (m *SessionManager) Remove(id string) { - m.mu.Lock() - delete(m.sessions, id) - m.mu.Unlock() -} - -// Get returns a session when found by the id, -// returns nil otherwise. -func (m *SessionManager) Get(id string) Session { - m.mu.RLock() - sess := m.sessions[id] - m.mu.RUnlock() - return sess -} - -// Range calls fn sequentially for each id and sess present in the sessions. -// If fn returns false, range stops the iteration. -func (m *SessionManager) Range(fn func(id string, sess Session) (next bool)) { - m.mu.RLock() - for id, sess := range m.sessions { - if !fn(id, sess) { - break - } - } - m.mu.RUnlock() -} diff --git a/session_manager_test.go b/session_manager_test.go deleted file mode 100644 index feb85d9..0000000 --- a/session_manager_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package easytcp - -import ( - "github.com/stretchr/testify/assert" - "sync" - "testing" -) - -func TestSessions(t *testing.T) { - wg := sync.WaitGroup{} - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - Sessions() - wg.Done() - }() - } - wg.Wait() - assert.NotNil(t, manager) - assert.Equal(t, manager, Sessions()) -} - -func TestManager_AddGetAndRemove(t *testing.T) { - mg := &SessionManager{} - - // should not add nil - assert.NotPanics(t, func() { mg.Add(nil) }) - - assert.Nil(t, mg.Get("not found")) - - wg := sync.WaitGroup{} - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - defer wg.Done() - sess := newSession(nil, &sessionOption{}) - mg.Add(sess) - s := mg.Get(sess.id) - assert.NotNil(t, s) - assert.Equal(t, s, sess) - mg.Remove(sess.id) - assert.Nil(t, mg.Get(sess.id)) - }() - } - wg.Wait() -} - -func TestManager_Range(t *testing.T) { - mg := &SessionManager{} - var count int - mg.Range(func(id string, sess Session) (next bool) { - count++ - return true - }) - assert.Zero(t, count) - - sess := newSession(nil, &sessionOption{}) - sess2 := newSession(nil, &sessionOption{}) - mg.Add(sess) - mg.Add(sess2) - - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - defer wg.Done() - count := 0 - mg.Range(func(id string, s Session) (next bool) { - count++ - return false - }) - assert.Equal(t, count, 1) - }() - - go func() { - defer wg.Done() - count := 0 - mg.Range(func(id string, s Session) (next bool) { - count++ - return true - }) - assert.Equal(t, count, 2) - }() - wg.Wait() -} diff --git a/session_test.go b/session_test.go index 2da4b04..2be381d 100644 --- a/session_test.go +++ b/session_test.go @@ -411,3 +411,11 @@ func TestSession_attemptConnWrite_when_err_is_not_temp(t *testing.T) { s := newSession(conn, &sessionOption{}) assert.ErrorIs(t, s.attemptConnWrite([]byte("whatever"), 10), netErr) } + +func Test_session_SetID(t *testing.T) { + sess := newSession(nil, &sessionOption{}) + _, ok := sess.ID().(string) + assert.True(t, ok) + sess.SetID(123) + assert.Equal(t, sess.ID(), 123) +} From d8a82275b3a78c030e69c749589eed3895fa4411 Mon Sep 17 00:00:00 2001 From: zxl Date: Wed, 5 Jan 2022 15:03:28 +0800 Subject: [PATCH 2/3] docs: update benchmark result --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0878ee3..ddccbe5 100644 --- a/README.md +++ b/README.md @@ -150,10 +150,10 @@ goos: darwin goarch: amd64 pkg: github.com/DarthPestilane/easytcp cpu: Intel(R) Core(TM) i5-8279U CPU @ 2.40GHz -Benchmark_NoHandler-8 250000 5375 ns/op 125 B/op 3 allocs/op -Benchmark_OneHandler-8 250000 5692 ns/op 133 B/op 4 allocs/op -Benchmark_DefaultPacker_Pack-8 250000 33.63 ns/op 16 B/op 1 allocs/op -Benchmark_DefaultPacker_Unpack-8 250000 109.3 ns/op 96 B/op 3 allocs/op +Benchmark_NoHandler-8 250000 5170 ns/op 129 B/op 4 allocs/op +Benchmark_OneHandler-8 250000 5137 ns/op 128 B/op 4 allocs/op +Benchmark_DefaultPacker_Pack-8 250000 35.87 ns/op 16 B/op 1 allocs/op +Benchmark_DefaultPacker_Unpack-8 250000 102.5 ns/op 96 B/op 3 allocs/op ``` *since easytcp is built on the top of golang `net` library, the benchmark of networks does not make much sense.* From 6e7f96911f29a5a625c4b22aa47f2a0f5cada785 Mon Sep 17 00:00:00 2001 From: zxl Date: Thu, 6 Jan 2022 09:46:46 +0800 Subject: [PATCH 3/3] refactor: minor improvement --- server.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index d12682e..8e3b87b 100644 --- a/server.go +++ b/server.go @@ -155,6 +155,8 @@ func (s *Server) acceptLoop() error { // handles the message through the session in different goroutines, // and waits until the session's closed, then close the conn. func (s *Server) handleConn(conn net.Conn) { + defer conn.Close() // nolint + sess := newSession(conn, &sessionOption{ Packer: s.Packer, Codec: s.Codec, @@ -175,9 +177,6 @@ func (s *Server) handleConn(conn net.Conn) { if s.OnSessionClose != nil { go s.OnSessionClose(sess) } - if err := conn.Close(); err != nil { - Log.Errorf("connection close err: %s", err) - } } // Stop stops server. Closing Listener and all connections.