Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ goos: darwin
goarch: amd64
pkg: github.com/DarthPestilane/easytcp
cpu: Intel(R) Core(TM) i5-8279U CPU @ 2.40GHz
Benchmark_NoHandler-8 250000 5375 ns/op 125 B/op 3 allocs/op
Benchmark_OneHandler-8 250000 5692 ns/op 133 B/op 4 allocs/op
Benchmark_DefaultPacker_Pack-8 250000 33.63 ns/op 16 B/op 1 allocs/op
Benchmark_DefaultPacker_Unpack-8 250000 109.3 ns/op 96 B/op 3 allocs/op
Benchmark_NoHandler-8 250000 5170 ns/op 129 B/op 4 allocs/op
Benchmark_OneHandler-8 250000 5137 ns/op 128 B/op 4 allocs/op
Benchmark_DefaultPacker_Pack-8 250000 35.87 ns/op 16 B/op 1 allocs/op
Benchmark_DefaultPacker_Unpack-8 250000 102.5 ns/op 96 B/op 3 allocs/op
```

*since easytcp is built on the top of golang `net` library, the benchmark of networks does not make much sense.*
Expand Down
52 changes: 40 additions & 12 deletions examples/tcp/broadcast/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,66 @@ import (
"github.com/sirupsen/logrus"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

var log *logrus.Logger
var sessions *SessionManager

func init() {
log = logrus.New()
sessions = &SessionManager{nextId: 1, storage: map[int64]easytcp.Session{}}
}

type SessionManager struct {
nextId int64
lock sync.Mutex
storage map[int64]easytcp.Session
}

func main() {
s := easytcp.NewServer(&easytcp.ServerOption{
Packer: easytcp.NewDefaultPacker(),
})

s.OnSessionCreate = func(sess easytcp.Session) {
// store session
sessions.lock.Lock()
defer sessions.lock.Unlock()
sess.SetID(sessions.nextId)
sessions.nextId++
sessions.storage[sess.ID().(int64)] = sess
}

s.OnSessionClose = func(sess easytcp.Session) {
// remove session
delete(sessions.storage, sess.ID().(int64))
}

s.Use(fixture.RecoverMiddleware(log), logMiddleware)

s.AddRoute(common.MsgIdBroadCastReq, func(ctx easytcp.Context) {
reqData := ctx.Request().Data

// broadcasting
go easytcp.Sessions().Range(func(id string, sess easytcp.Session) (next bool) {
if ctx.Session().ID() == id {
return true // next iteration
// broadcasting to other sessions
currentSession := ctx.Session()
for _, sess := range sessions.storage {
targetSession := sess
if currentSession.ID() == targetSession.ID() {
continue
}
respData := fmt.Sprintf("%s (broadcast from %s)", reqData, ctx.Session().ID())
ctx.Copy().SetResponseMessage(&message.Entry{
ID: common.MsgIdBroadCastAck,
Data: []byte(respData),
}).SendTo(sess)
return true
})
respData := fmt.Sprintf("%s (broadcast from %d to %d)", reqData, currentSession.ID(), targetSession.ID())
respEntry := &message.Entry{ID: common.MsgIdBroadCastAck, Data: []byte(respData)}
go func() {
targetSession.AllocateContext().SetResponseMessage(respEntry).Send()
// can also write like this.
// ctx.Copy().SetResponseMessage(respEntry).SendTo(targetSession)
// or this.
// ctx.Copy().SetSession(targetSession).SetResponseMessage(respEntry).Send()
}()
}

ctx.SetResponseMessage(&message.Entry{
ID: common.MsgIdBroadCastAck,
Expand Down Expand Up @@ -68,7 +96,7 @@ func logMiddleware(next easytcp.HandlerFunc) easytcp.HandlerFunc {
log.Infof("recv request | %s", ctx.Request().Data)
defer func() {
var resp = ctx.Response()
log.Infof("send response | id: %d; size: %d; data: %s", resp.ID, len(resp.Data), resp.Data)
log.Infof("send response |sessId: %d; id: %d; size: %d; data: %s", ctx.Session().ID(), resp.ID, len(resp.Data), resp.Data)
}()
next(ctx)
}
Expand Down
13 changes: 8 additions & 5 deletions examples/tcp/simple/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ var log *logrus.Logger

func init() {
log = logrus.New()
log.SetLevel(logrus.TraceLevel)
}

func main() {
// go printGoroutineNum()

easytcp.SetLogger(log)
s := easytcp.NewServer(&easytcp.ServerOption{
SocketReadBufferSize: 1024 * 1024,
SocketWriteBufferSize: 1024 * 1024,
Expand All @@ -33,10 +35,10 @@ func main() {
Codec: nil,
})
s.OnSessionCreate = func(sess easytcp.Session) {
log.Infof("session created: %s", sess.ID())
log.Infof("session created: %v", sess.ID())
}
s.OnSessionClose = func(sess easytcp.Session) {
log.Warnf("session closed: %s", sess.ID())
log.Warnf("session closed: %v", sess.ID())
}

// register global middlewares
Expand All @@ -62,15 +64,16 @@ func main() {
if err := s.Stop(); err != nil {
log.Errorf("server stopped err: %s", err)
}
time.Sleep(time.Second)
time.Sleep(time.Second * 3)
}

func logMiddleware(next easytcp.HandlerFunc) easytcp.HandlerFunc {
return func(c easytcp.Context) {
log.Infof("rec <<< | id:(%d) size:(%d) data: %s", c.Request().ID, len(c.Request().Data), c.Request().Data)
req := c.Request()
log.Infof("rec <<< id:(%d) size:(%d) data: %s", req.ID, len(req.Data), req.Data)
defer func() {
resp := c.Response()
log.Infof("snd >>> | id:(%d) size:(%d) data: %s", resp.ID, len(resp.Data), resp.Data)
log.Infof("snd >>> id:(%d) size:(%d) data: %s", resp.ID, len(resp.Data), resp.Data)
}()
next(c)
}
Expand Down
26 changes: 8 additions & 18 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,45 +153,35 @@ func (s *Server) acceptLoop() error {

// handleConn creates a new session with conn,
// handles the message through the session in different goroutines,
// and waits until the session's closed.
// and waits until the session's closed, then close the conn.
func (s *Server) handleConn(conn net.Conn) {
defer conn.Close() // nolint

sess := newSession(conn, &sessionOption{
Packer: s.Packer,
Codec: s.Codec,
respQueueSize: s.respQueueSize,
})
Sessions().Add(sess)
if s.OnSessionCreate != nil {
go s.OnSessionCreate(sess)
}

go sess.readInbound(s.router, s.readTimeout) // start reading message packet from connection.
go sess.writeOutbound(s.writeTimeout, s.writeAttemptTimes) // start writing message packet to connection.

<-sess.closed // wait for session finished.
Sessions().Remove(sess.ID()) // session has been closed, remove it.
select {
case <-sess.closed: // wait for session finished.
case <-s.stopped: // or the server is stopped.
}

if s.OnSessionClose != nil {
go s.OnSessionClose(sess)
}
if err := conn.Close(); err != nil {
Log.Errorf("connection close err: %s", err)
}
}

// Stop stops server by closing all the TCP sessions, listener and the router.
// Stop stops server. Closing Listener and all connections.
func (s *Server) Stop() error {
close(s.stopped)

// close all sessions
closedNum := 0
Sessions().Range(func(id string, sess Session) (next bool) {
sess.Close()
closedNum++
return true
})
Log.Tracef("%d session(s) closed", closedNum)

return s.Listener.Close()
}

Expand Down
23 changes: 16 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
// Session represents a TCP session.
type Session interface {
// ID returns current session's id.
ID() string
ID() interface{}

// SetID sets current session's id.
SetID(id interface{})

// Send sends the ctx to the respQueue.
Send(ctx Context) bool
Expand All @@ -26,10 +29,10 @@ type Session interface {
}

type session struct {
id string // session's ID. it's a UUID
id interface{} // session's ID.
conn net.Conn // tcp connection
closed chan struct{} // to close()
closeOne sync.Once // ensure one session only close once
closeOnce sync.Once // ensure one session only close once
respQueue chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
Expand All @@ -49,7 +52,7 @@ type sessionOption struct {
// Returns a session pointer.
func newSession(conn net.Conn, opt *sessionOption) *session {
return &session{
id: uuid.NewString(),
id: uuid.NewString(), // use uuid as default
conn: conn,
closed: make(chan struct{}),
respQueue: make(chan Context, opt.respQueueSize),
Expand All @@ -59,11 +62,17 @@ func newSession(conn net.Conn, opt *sessionOption) *session {
}
}

// ID returns the session's ID.
func (s *session) ID() string {
// ID returns the session's id.
func (s *session) ID() interface{} {
return s.id
}

// SetID sets session id.
// Can be called in server.OnSessionCreate() callback.
func (s *session) SetID(id interface{}) {
s.id = id
}

// Send pushes response message entry to respQueue.
// Returns error if session is closed.
func (s *session) Send(ctx Context) (ok bool) {
Expand All @@ -85,7 +94,7 @@ func (s *session) Codec() Codec {
// Close closes the session, but doesn't close the connection.
// The connection will be closed in the server once the session's closed.
func (s *session) Close() {
s.closeOne.Do(func() { close(s.closed) })
s.closeOnce.Do(func() { close(s.closed) })
}

// AllocateContext gets a Context from pool and reset all but session.
Expand Down
67 changes: 0 additions & 67 deletions session_manager.go

This file was deleted.

Loading