Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/zinx_server/conf/zinx.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"Host":"127.0.0.1",
"TCPPort":8999,
"MaxConn":3,
"WorkerMode": "",
"WorkerPoolSize":10,
"LogDir": "./mylog",
"LogFile":"zinx.log",
Expand Down
7 changes: 7 additions & 0 deletions zconf/globalobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ const (
ServerModeWebsocket = "websocket"
)

const (
WorkerModeHash = "Hash" //By default, the round-robin average allocation rule is used.(默认使用取余的方式)
WorkerModeBind = "Bind" //Bind a worker to each connection.(为每个连接分配一个worker)
)

/*
Store all global parameters related to the Zinx framework for use by other modules.
Some parameters can also be configured by the user based on the zinx.json file.
Expand All @@ -47,6 +52,7 @@ type Config struct {
MaxConn int //The maximum number of connections that the server can handle.(当前服务器主机允许的最大链接个数)
WorkerPoolSize uint32 //The number of worker pools in the business logic.(业务工作Worker池的数量)
MaxWorkerTaskLen uint32 //The maximum number of tasks that a worker pool can handle.(业务工作Worker对应负责的任务队列最大任务存储数量)
WorkerMode string //The way to assign workers to connections.(为链接分配worker的方式)
MaxMsgChanLen uint32 //The maximum length of the send buffer message queue.(SendBuffMsg发送消息的缓冲最大长度)
IOReadBuffSize uint32 //The maximum size of the read buffer for each IO operation.(每次IO最大的读取长度)

Expand Down Expand Up @@ -203,6 +209,7 @@ func init() {
MaxPacketSize: 4096,
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
WorkerMode: "",
MaxMsgChanLen: 1024,
LogDir: pwd + "/log",
LogFile: "", //if set "", print to Stderr(默认日志文件为空,打印到stderr)
Expand Down
4 changes: 4 additions & 0 deletions zconf/userconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func UserConfToGlobal(config *Config) {
if config.MaxWorkerTaskLen != 0 {
GlobalObject.MaxWorkerTaskLen = config.MaxWorkerTaskLen
}
if config.WorkerMode != "" {
GlobalObject.WorkerMode = config.WorkerMode
}

if config.MaxMsgChanLen != 0 {
GlobalObject.MaxMsgChanLen = config.MaxMsgChanLen
}
Expand Down
1 change: 1 addition & 0 deletions ziface/iconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type IConnection interface {
GetTCPConnection() net.Conn // Get the original socket TCPConn from the current connection (从当前连接获取原始的socket TCPConn)
GetConnID() uint64 // Get the current connection ID (获取当前连接ID)
GetMsgHandler() IMsgHandle // Get the message handler (获取消息处理器)
GetWorkerID() uint32 // Get Worker ID(获取workerid)
RemoteAddr() net.Addr // Get the remote address information of the connection (获取链接远程地址信息)
LocalAddr() net.Addr // Get the local address information of the connection (获取链接本地地址信息)
LocalAddrString() string // Get the local address information of the connection as a string
Expand Down
2 changes: 1 addition & 1 deletion zinterceptor/framedocder.go
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他都review哈,没问题 👍🏻! 但是这个decode的转换还不确定。

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里删除了类型转换,是因为变量本身就是int64类型:
frameLength := d.getUnadjustedFrameLength

然后,.golangci.yaml中有一行:
unconvert // 禁止冗余的类型转换

这导致PR无法通过GIthub的自动检测。

作者辛苦了😄

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LI-GUOJIE 妥,已merge

Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (d *FrameDecoder) decode(buf []byte) []byte {

//跳过的字节数是否大于数据包长度
if d.InitialBytesToStrip > frameLengthInt {
d.failOnFrameLengthLessThanInitialBytesToStrip(in, int64(frameLength), d.InitialBytesToStrip)
d.failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, d.InitialBytesToStrip)
}
//跳过initialBytesToStrip个字节
in.Next(d.InitialBytesToStrip)
Expand Down
14 changes: 14 additions & 0 deletions znet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type Connection struct {
// 这个是理论支持的进程connID的最大数量)
connID uint64

// The workerid responsible for handling the link
// 负责处理该链接的workerid
workerID uint32

// The message management module that manages MsgID and the corresponding processing method
// (消息管理MsgID和对应处理方法的消息管理模块)
msgHandler ziface.IMsgHandle
Expand Down Expand Up @@ -276,13 +280,19 @@ func (c *Connection) Start() {
c.updateActivity()
}

// 占用workerid
c.workerID = useWorker(c)

// Start the Goroutine for reading data from the client
// (开启用户从客户端读取数据流程的Goroutine)
go c.StartReader()

select {
case <-c.ctx.Done():
c.finalizer()

// 归还workerid
freeWorker(c)
return
}
}
Expand Down Expand Up @@ -310,6 +320,10 @@ func (c *Connection) GetConnID() uint64 {
return c.connID
}

func (c *Connection) GetWorkerID() uint32 {
return c.workerID
}

func (c *Connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
Expand Down
80 changes: 67 additions & 13 deletions znet/msghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package znet
import (
"encoding/hex"
"fmt"
"sync"

"github.com/aceld/zinx/zconf"
"github.com/aceld/zinx/ziface"
Expand All @@ -28,6 +29,11 @@ type MsgHandle struct {
// (业务工作Worker池的数量)
WorkerPoolSize uint32

// A collection of idle workers, used for zconf.WorkerModeBind
// 空闲worker集合,用于zconf.WorkerModeBind
freeWorkers map[uint32]struct{}
freeWorkerMu sync.Mutex

// A message queue for workers to take tasks
// (Worker负责取任务的消息队列)
TaskQueue []chan ziface.IRequest
Expand All @@ -41,13 +47,27 @@ type MsgHandle struct {
// newMsgHandle creates MsgHandle
// zinxRole: IServer/IClient
func newMsgHandle() *MsgHandle {
var freeWorkers map[uint32]struct{}
if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind {
// Assign a workder to each link, avoid interactions when multiple links are processed by the same worker
// MaxWorkerTaskLen can also be reduced, for example, 50
// 为每个链接分配一个workder,避免同一worker处理多个链接时的互相影响
// 同时可以减小MaxWorkerTaskLen,比如50,因为每个worker的负担减轻了
zconf.GlobalObject.WorkerPoolSize = uint32(zconf.GlobalObject.MaxConn)
freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize)
for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ {
freeWorkers[i] = struct{}{}
}
}

handle := &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
RouterSlices: NewRouterSlices(),
WorkerPoolSize: zconf.GlobalObject.WorkerPoolSize,
// One worker corresponds to one queue (一个worker对应一个queue)
TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize),
builder: newChainBuilder(),
TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize),
freeWorkers: freeWorkers,
builder: newChainBuilder(),
}

// It is necessary to add the MsgHandle to the responsibility chain here, and it is the last link in the responsibility chain. After decoding in the MsgHandle, data distribution is done by router
Expand All @@ -56,6 +76,50 @@ func newMsgHandle() *MsgHandle {
return handle
}

// Use worker ID
// 占用workerID
func useWorker(conn ziface.IConnection) uint32 {
mh, _ := conn.GetMsgHandler().(*MsgHandle)
if mh == nil {
zlog.Ins().ErrorF("useWorker failed, mh is nil")
return 0
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind {
mh.freeWorkerMu.Lock()
defer mh.freeWorkerMu.Unlock()

for k := range mh.freeWorkers {
delete(mh.freeWorkers, k)
return k
}
}

// Assign the worker responsible for processing the current connection based on the ConnID
// Using a round-robin average allocation rule to get the workerID that needs to process this connection
// (根据ConnID来分配当前的连接应该由哪个worker负责处理
// 轮询的平均分配法则
// 得到需要处理此条连接的workerID)
return uint32(conn.GetConnID() % uint64(mh.WorkerPoolSize))
}

// Free worker ID
// 释放workerid
func freeWorker(conn ziface.IConnection) {
mh, _ := conn.GetMsgHandler().(*MsgHandle)
if mh == nil {
zlog.Ins().ErrorF("useWorker failed, mh is nil")
return
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind {
mh.freeWorkerMu.Lock()
defer mh.freeWorkerMu.Unlock()

mh.freeWorkers[conn.GetWorkerID()] = struct{}{}
}
}

// Data processing interceptor that is necessary by default in Zinx
// (Zinx默认必经的数据处理拦截器)
func (mh *MsgHandle) Intercept(chain ziface.IChain) ziface.IcResp {
Expand Down Expand Up @@ -91,20 +155,10 @@ func (mh *MsgHandle) AddInterceptor(interceptor ziface.IInterceptor) {
}
}

func (mh *MsgHandle) GetTaskQueueWorkerId(request ziface.IRequest) uint64 {
// Assign the worker responsible for processing the current connection based on the ConnID
// Using a round-robin average allocation rule to get the workerID that needs to process this connection
// (根据ConnID来分配当前的连接应该由哪个worker负责处理
// 轮询的平均分配法则
// 得到需要处理此条连接的workerID)
workerID := request.GetConnection().GetConnID() % uint64(mh.WorkerPoolSize)
return workerID
}

// SendMsgToTaskQueue sends the message to the TaskQueue for processing by the worker
// (将消息交给TaskQueue,由worker进行处理)
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
workerID := mh.GetTaskQueueWorkerId(request)
workerID := request.GetConnection().GetWorkerID()
// zlog.Ins().DebugF("Add ConnID=%d request msgID=%d to workerID=%d", request.GetConnection().GetConnID(), request.GetMsgID(), workerID)
// Send the request message to the task queue
mh.TaskQueue[workerID] <- request
Expand Down
14 changes: 14 additions & 0 deletions znet/ws_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type WsConnection struct {
// 这个是理论支持的进程connID的最大数量)
connID uint64

// The workerid responsible for handling the link
// 负责处理该链接的workerid
workerID uint32

// msgHandler is the message management module for MsgID and the corresponding message handling method.
// (消息管理MsgID和对应处理方法的消息管理模块)
msgHandler ziface.IMsgHandle
Expand Down Expand Up @@ -264,13 +268,19 @@ func (c *WsConnection) Start() {
c.updateActivity()
}

// 占用workerid
c.workerID = useWorker(c)

// Start the Goroutine for users to read data from the client.
// (开启用户从客户端读取数据流程的Goroutine)
go c.StartReader()

select {
case <-c.ctx.Done():
c.finalizer()

// 归还workerid
freeWorker(c)
return
}
}
Expand Down Expand Up @@ -298,6 +308,10 @@ func (c *WsConnection) GetConnID() uint64 {
return c.connID
}

func (c *WsConnection) GetWorkerID() uint32 {
return c.workerID
}

func (c *WsConnection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
Expand Down