From 2d5105529869c845db17c6d2ee2f67790eb1644c Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Thu, 11 May 2023 11:21:18 +0800 Subject: [PATCH 01/12] =?UTF-8?q?Optimize=20broadcast=20messages=20to=20re?= =?UTF-8?q?duce=20the=20number=20of=20messages=20serialized=20and=20packag?= =?UTF-8?q?ed=20during=20massive=20online=20player=20interaction=20from=20?= =?UTF-8?q?N^2=20to=20N=20=EF=BC=88=E4=BC=98=E5=8C=96=E5=B9=BF=E6=92=AD?= =?UTF-8?q?=E6=B6=88=E6=81=AF=EF=BC=8C=E5=B0=86=E5=A4=A7=E8=A7=84=E6=A8=A1?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E7=8E=A9=E5=AE=B6=E4=BA=92=E5=8A=A8=E6=97=B6?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=BA=8F=E5=88=97=E5=8C=96=E4=B8=8E=E6=89=93?= =?UTF-8?q?=E5=8C=85=E7=9A=84=E6=AC=A1=E6=95=B0=E4=BB=8EN^2=E9=99=8D?= =?UTF-8?q?=E5=88=B0N=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zinx_app_demo/mmo_game/core/player.go | 62 +++++++++++++++++-------- zinx_app_demo/mmo_game/datapack/pack.go | 36 ++++++++++++++ 2 files changed, 78 insertions(+), 20 deletions(-) create mode 100644 zinx_app_demo/mmo_game/datapack/pack.go diff --git a/zinx_app_demo/mmo_game/core/player.go b/zinx_app_demo/mmo_game/core/player.go index e2248fac..3c2870c2 100644 --- a/zinx_app_demo/mmo_game/core/player.go +++ b/zinx_app_demo/mmo_game/core/player.go @@ -7,11 +7,12 @@ import ( "time" "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zinx_app_demo/mmo_game/datapack" "github.com/aceld/zinx/zinx_app_demo/mmo_game/pb" "github.com/golang/protobuf/proto" ) -//玩家对象 +// 玩家对象 type Player struct { PID int32 //玩家ID Conn ziface.IConnection //当前玩家的连接 @@ -22,12 +23,12 @@ type Player struct { } /* - Player ID 生成器 +Player ID 生成器 */ var PIDGen int32 = 1 //用来生成玩家ID的计数器 var IDLock sync.Mutex //保护PIDGen的互斥机制 -//创建一个玩家对象 +// 创建一个玩家对象 func NewPlayer(conn ziface.IConnection) *Player { //生成一个PID IDLock.Lock() @@ -47,7 +48,7 @@ func NewPlayer(conn ziface.IConnection) *Player { return p } -//告知客户端pID,同步已经生成的玩家ID给客户端 +// 告知客户端pID,同步已经生成的玩家ID给客户端 func (p *Player) SyncPID() { //组建MsgID0 proto数据 data := &pb.SyncPID{ @@ -58,7 +59,7 @@ func (p *Player) SyncPID() { p.SendMsg(1, data) } -//广播玩家自己的出生地点 +// 广播玩家自己的出生地点 func (p *Player) BroadCastStartPosition() { //组建MsgID200 proto数据 @@ -79,7 +80,7 @@ func (p *Player) BroadCastStartPosition() { p.SendMsg(200, msg) } -//给当前玩家周边的(九宫格内)玩家广播自己的位置,让他们显示自己 +// 给当前玩家周边的(九宫格内)玩家广播自己的位置,让他们显示自己 func (p *Player) SyncSurrounding() { //1 根据自己的位置,获取周围九宫格内的玩家pID pIDs := WorldMgrObj.AoiMgr.GetPIDsByPos(p.X, p.Z) @@ -102,9 +103,11 @@ func (p *Player) SyncSurrounding() { }, }, } - //3.2 每个玩家分别给对应的客户端发送200消息,显示人物 + //3.2 打包广播消息 + data := datapack.SerializeMsg2Bytes(200, msg) + //3.3 每个玩家分别给对应的客户端发送200消息,显示人物 for _, player := range players { - player.SendMsg(200, msg) + player.SendBytes(data) } //4 让周围九宫格内的玩家出现在自己的视野中 //4.1 制作Message SyncPlayers 数据 @@ -131,7 +134,7 @@ func (p *Player) SyncSurrounding() { p.SendMsg(202, SyncPlayersMsg) } -//广播玩家聊天 +// 广播玩家聊天 func (p *Player) Talk(content string) { //1. 组建MsgID200 proto数据 msg := &pb.BroadCast{ @@ -145,13 +148,16 @@ func (p *Player) Talk(content string) { //2. 得到当前世界所有的在线玩家 players := WorldMgrObj.GetAllPlayers() - //3. 向所有的玩家发送MsgID:200消息 + //3. 打包广播消息 + data := datapack.SerializeMsg2Bytes(200, msg) + + //4. 向所有的玩家发送MsgID:200消息 for _, player := range players { - player.SendMsg(200, msg) + player.SendBytes(data) } } -//广播玩家位置移动 +// 广播玩家位置移动 func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) { //触发消失视野和添加视野业务 @@ -192,9 +198,11 @@ func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) { //获取当前玩家周边全部玩家 players := p.GetSurroundingPlayers() + // 打包广播消息 + data := datapack.SerializeMsg2Bytes(200, msg) //向周边的每个玩家发送MsgID:200消息,移动位置更新消息 for _, player := range players { - player.SendMsg(200, msg) + player.SendBytes(data) } } @@ -220,6 +228,7 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { offlineMsg := &pb.SyncPID{ PID: p.PID, } + dataOffline := datapack.SerializeMsg2Bytes(201, offlineMsg) //找到在旧的九宫格中出现,但是在新的九宫格中没有出现的格子 leavingGrIDs := make([]*GrID, 0) @@ -234,7 +243,7 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { players := WorldMgrObj.GetPlayersByGID(grID.GID) for _, player := range players { //让自己在其他玩家的客户端中消失 - player.SendMsg(201, offlineMsg) + player.SendBytes(dataOffline) //将其他玩家信息 在自己的客户端中消失 anotherOfflineMsg := &pb.SyncPID{ @@ -267,6 +276,7 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { }, }, } + dataOnline := datapack.SerializeMsg2Bytes(200, onlineMsg) //获取需要显示格子的全部玩家 for _, grID := range enteringGrIDs { @@ -274,7 +284,7 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { for _, player := range players { //让自己出现在其他人视野中 - player.SendMsg(200, onlineMsg) + player.SendBytes(dataOnline) //让其他人出现在自己的视野中 anotherOnlineMsg := &pb.BroadCast{ @@ -298,7 +308,7 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { return nil } -//获得当前玩家的AOI周边玩家信息 +// 获得当前玩家的AOI周边玩家信息 func (p *Player) GetSurroundingPlayers() []*Player { //得到当前AOI区域的所有pID pIDs := WorldMgrObj.AoiMgr.GetPIDsByPos(p.X, p.Z) @@ -312,7 +322,7 @@ func (p *Player) GetSurroundingPlayers() []*Player { return players } -//玩家下线 +// 玩家下线 func (p *Player) LostConnection() { //1 获取周围AOI九宫格内的玩家 players := p.GetSurroundingPlayers() @@ -321,10 +331,11 @@ func (p *Player) LostConnection() { msg := &pb.SyncPID{ PID: p.PID, } + data := datapack.SerializeMsg2Bytes(201, msg) //3 向周围玩家发送消息 for _, player := range players { - player.SendMsg(201, msg) + player.SendBytes(data) } //4 世界管理器将当前玩家从AOI中摘除 @@ -333,8 +344,7 @@ func (p *Player) LostConnection() { } /* - 发送消息给客户端, - 主要是将pb的protobuf数据序列化之后发送 +发送消息给客户端 */ func (p *Player) SendMsg(msgID uint32, data proto.Message) { if p.Conn == nil { @@ -358,3 +368,15 @@ func (p *Player) SendMsg(msgID uint32, data proto.Message) { return } + +/* +added by LI +Send serialized bytes to the client for use in broadcast functions to significantly reduce cpu usage. +发送已序列化的字符串给客户端,用于各类广播方法,减少消息序列化次数,显著降低大规模广播时的CPU消耗。 +*/ +func (p *Player) SendBytes(data []byte) { + if err := p.Conn.SendToQueue(data); err != nil { + fmt.Println("failed to send bytes with err: ", err) + return + } +} diff --git a/zinx_app_demo/mmo_game/datapack/pack.go b/zinx_app_demo/mmo_game/datapack/pack.go new file mode 100644 index 00000000..4ec62b95 --- /dev/null +++ b/zinx_app_demo/mmo_game/datapack/pack.go @@ -0,0 +1,36 @@ +package datapack + +import ( + "fmt" + + "github.com/aceld/zinx/ziface" + "github.com/aceld/zinx/zpack" + "github.com/golang/protobuf/proto" +) + +// Replace here with your custom packer (if one exists) +// 将这里替换为你自定义的打包器(如果存在的话) +var DefaultPack = zpack.Factory().NewPack(ziface.ZinxDataPack) + +// Serialize message +// 将消息序列化 +func SerializeMsg2Bytes(msgId uint32, msgData proto.Message) []byte { + + // Marshal message + // 将proto Message结构体序列化 + msg, err := proto.Marshal(msgData) + if err != nil { + fmt.Printf("SerializeMsg2Bytes, marshal failed, msgId:%v, msgData:%v, err: %v\n", msgId, msgData, err) + return nil + } + + // Pack data + // 封包 + data, err := DefaultPack.Pack(zpack.NewMsgPackage(msgId, msg)) + if err != nil { + fmt.Printf("SerializeMsg2Bytes, pack failed, msgId:%v, msg:%v, err: %v\n", msgId, msg, err) + return nil + } + + return data +} From 52e599ffc557568c1149b01579667c3dca67cec6 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Thu, 11 May 2023 11:41:45 +0800 Subject: [PATCH 02/12] Simplify code. --- zinx_app_demo/mmo_game/core/player.go | 43 +++++++++++++-------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/zinx_app_demo/mmo_game/core/player.go b/zinx_app_demo/mmo_game/core/player.go index 3c2870c2..a1f073d0 100644 --- a/zinx_app_demo/mmo_game/core/player.go +++ b/zinx_app_demo/mmo_game/core/player.go @@ -103,12 +103,8 @@ func (p *Player) SyncSurrounding() { }, }, } - //3.2 打包广播消息 - data := datapack.SerializeMsg2Bytes(200, msg) - //3.3 每个玩家分别给对应的客户端发送200消息,显示人物 - for _, player := range players { - player.SendBytes(data) - } + //3.2 每个玩家分别给对应的客户端发送200消息,显示人物 + BroadCast(players, 200, msg) //4 让周围九宫格内的玩家出现在自己的视野中 //4.1 制作Message SyncPlayers 数据 playersData := make([]*pb.Player, 0, len(players)) @@ -148,13 +144,8 @@ func (p *Player) Talk(content string) { //2. 得到当前世界所有的在线玩家 players := WorldMgrObj.GetAllPlayers() - //3. 打包广播消息 - data := datapack.SerializeMsg2Bytes(200, msg) - - //4. 向所有的玩家发送MsgID:200消息 - for _, player := range players { - player.SendBytes(data) - } + //3. 向所有的玩家发送MsgID:200消息 + BroadCast(players, 200, msg) } // 广播玩家位置移动 @@ -198,12 +189,9 @@ func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) { //获取当前玩家周边全部玩家 players := p.GetSurroundingPlayers() - // 打包广播消息 - data := datapack.SerializeMsg2Bytes(200, msg) + //向周边的每个玩家发送MsgID:200消息,移动位置更新消息 - for _, player := range players { - player.SendBytes(data) - } + BroadCast(players, 200, msg) } func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { @@ -331,12 +319,9 @@ func (p *Player) LostConnection() { msg := &pb.SyncPID{ PID: p.PID, } - data := datapack.SerializeMsg2Bytes(201, msg) //3 向周围玩家发送消息 - for _, player := range players { - player.SendBytes(data) - } + BroadCast(players, 201, msg) //4 世界管理器将当前玩家从AOI中摘除 WorldMgrObj.AoiMgr.RemoveFromGrIDByPos(int(p.PID), p.X, p.Z) @@ -375,8 +360,22 @@ Send serialized bytes to the client for use in broadcast functions to significan 发送已序列化的字符串给客户端,用于各类广播方法,减少消息序列化次数,显著降低大规模广播时的CPU消耗。 */ func (p *Player) SendBytes(data []byte) { + if p.Conn == nil { + fmt.Println("failed to send bytes, connection in player is nil") + return + } + if err := p.Conn.SendToQueue(data); err != nil { fmt.Println("failed to send bytes with err: ", err) return } } + +// Send Message to multiple players +// 将消息广播给一组玩家 +func BroadCast(players []*Player, msgID uint32, msgData proto.Message) { + data := datapack.SerializeMsg2Bytes(msgID, msgData) + for _, player := range players { + player.SendBytes(data) + } +} From 35c65e003731bcae9e475f056caa8c87c0be9b46 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Thu, 11 May 2023 11:44:43 +0800 Subject: [PATCH 03/12] Delete redundant time.Sleep --- zinx_app_demo/mmo_game/core/player.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/zinx_app_demo/mmo_game/core/player.go b/zinx_app_demo/mmo_game/core/player.go index a1f073d0..0a705768 100644 --- a/zinx_app_demo/mmo_game/core/player.go +++ b/zinx_app_demo/mmo_game/core/player.go @@ -4,7 +4,6 @@ import ( "fmt" "math/rand" "sync" - "time" "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/zinx_app_demo/mmo_game/datapack" @@ -238,7 +237,6 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { PID: player.PID, } p.SendMsg(201, anotherOfflineMsg) - time.Sleep(200 * time.Millisecond) } } @@ -288,7 +286,6 @@ func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error { }, } - time.Sleep(200 * time.Millisecond) p.SendMsg(200, anotherOnlineMsg) } } From d8e0e71a2f3d2e83a0fb23cc75374eb5f5b73993 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Sat, 13 May 2023 11:47:11 +0800 Subject: [PATCH 04/12] =?UTF-8?q?Assign=20a=20workder=20to=20each=20link,?= =?UTF-8?q?=20avoid=20interactions=20when=20multiple=20links=20are=20proce?= =?UTF-8?q?ssed=20by=20the=20same=20worker=20=EF=BC=88=E4=B8=BA=E6=AF=8F?= =?UTF-8?q?=E4=B8=AA=E9=93=BE=E6=8E=A5=E5=88=86=E9=85=8D=E4=B8=80=E4=B8=AA?= =?UTF-8?q?workder=EF=BC=8C=E9=81=BF=E5=85=8D=E5=90=8C=E4=B8=80worker?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=A4=9A=E4=B8=AA=E9=93=BE=E6=8E=A5=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E4=BA=92=E7=9B=B8=E5=BD=B1=E5=93=8D=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zconf/globalobj.go | 61 +++++++++++++++++++++++-------------------- zconf/userconf.go | 11 ++++++++ ziface/iconnection.go | 1 + ziface/imsghandler.go | 8 ++++++ znet/connection.go | 14 ++++++++++ znet/msghandler.go | 61 +++++++++++++++++++++++++++++++++++-------- znet/ws_connection.go | 14 ++++++++++ 7 files changed, 130 insertions(+), 40 deletions(-) diff --git a/zconf/globalobj.go b/zconf/globalobj.go index 8bacaba9..37de198f 100644 --- a/zconf/globalobj.go +++ b/zconf/globalobj.go @@ -8,14 +8,15 @@ package zconf import ( "encoding/json" "fmt" - "github.com/aceld/zinx/zlog" - "github.com/aceld/zinx/zutils/commandline/args" - "github.com/aceld/zinx/zutils/commandline/uflag" "io/ioutil" "os" "reflect" "testing" "time" + + "github.com/aceld/zinx/zlog" + "github.com/aceld/zinx/zutils/commandline/args" + "github.com/aceld/zinx/zutils/commandline/uflag" ) const ( @@ -41,13 +42,14 @@ type Config struct { /* Zinx */ - Version string //The version of the Zinx framework.(当前Zinx版本号) - MaxPacketSize uint32 //The maximum size of the packets that can be sent or received.(读写数据包的最大值) - MaxConn int //The maximum number of connections that the server can handle.(当前服务器主机允许的最大链接个数) - WorkerPoolSize uint32 //The number of worker pools in the business logic.(业务工作Worker池的数量) - MaxWorkerTaskLen uint32 //The maximum number of tasks that a worker pool can handle.(业务工作Worker对应负责的任务队列最大任务存储数量) - MaxMsgChanLen uint32 //The maximum length of the send buffer message queue.(SendBuffMsg发送消息的缓冲最大长度) - IOReadBuffSize uint32 //The maximum size of the read buffer for each IO operation.(每次IO最大的读取长度) + Version string //The version of the Zinx framework.(当前Zinx版本号) + MaxPacketSize uint32 //The maximum size of the packets that can be sent or received.(读写数据包的最大值) + MaxConn int //The maximum number of connections that the server can handle.(当前服务器主机允许的最大链接个数) + WorkerPoolSize uint32 //The number of worker pools in the business logic.(业务工作Worker池的数量) + MaxWorkerTaskLen uint32 //The maximum number of tasks that a worker pool can handle.(业务工作Worker对应负责的任务队列最大任务存储数量) + BalanceWorkderTaskLen uint32 //Assign a workder to each link.(为每个链接分配一个workder) + MaxMsgChanLen uint32 //The maximum length of the send buffer message queue.(SendBuffMsg发送消息的缓冲最大长度) + IOReadBuffSize uint32 //The maximum size of the read buffer for each IO operation.(每次IO最大的读取长度) //The server mode, which can be "tcp" or "websocket". If it is empty, both modes are enabled. //"tcp":tcp监听, "websocket":websocket 监听 为空时同时开启 @@ -193,25 +195,26 @@ func init() { // Initialize the GlobalObject variable and set some default values. // (初始化GlobalObject变量,设置一些默认值) GlobalObject = &Config{ - Name: "ZinxServerApp", - Version: "V1.0", - TCPPort: 8999, - WsPort: 9000, - Host: "0.0.0.0", - MaxConn: 12000, - MaxPacketSize: 4096, - WorkerPoolSize: 10, - MaxWorkerTaskLen: 1024, - MaxMsgChanLen: 1024, - LogDir: pwd + "/log", - LogFile: "", //if set "", print to Stderr(默认日志文件为空,打印到stderr) - LogIsolationLevel: 0, - HeartbeatMax: 10, //The default maximum interval for heartbeat detection is 10 seconds. (默认心跳检测最长间隔为10秒) - IOReadBuffSize: 1024, - CertFile: "", - PrivateKeyFile: "", - Mode: ServerModeTcp, - RouterSlicesMode: false, + Name: "ZinxServerApp", + Version: "V1.0", + TCPPort: 8999, + WsPort: 9000, + Host: "0.0.0.0", + MaxConn: 12000, + MaxPacketSize: 4096, + WorkerPoolSize: 10, + MaxWorkerTaskLen: 1024, + BalanceWorkderTaskLen: 0, // Recommended value:50.(推荐值:50) + MaxMsgChanLen: 1024, + LogDir: pwd + "/log", + LogFile: "", //if set "", print to Stderr(默认日志文件为空,打印到stderr) + LogIsolationLevel: 0, + HeartbeatMax: 10, //The default maximum interval for heartbeat detection is 10 seconds. (默认心跳检测最长间隔为10秒) + IOReadBuffSize: 1024, + CertFile: "", + PrivateKeyFile: "", + Mode: ServerModeTcp, + RouterSlicesMode: false, } // Note: Load some user-configured parameters from the configuration file. diff --git a/zconf/userconf.go b/zconf/userconf.go index ff4d659c..80b899d7 100644 --- a/zconf/userconf.go +++ b/zconf/userconf.go @@ -35,6 +35,17 @@ func UserConfToGlobal(config *Config) { if config.MaxWorkerTaskLen != 0 { GlobalObject.MaxWorkerTaskLen = config.MaxWorkerTaskLen } + + // Assign a workder to each link and use this value instead of MaxWorkerTaskLen, Avoid interactions when multiple links are processed by the same worker + // 为每个链接分配一个workder,同时使用这个值替代MaxWorkerTaskLen,避免同一worker处理多个链接时的互相影响 + if config.BalanceWorkderTaskLen != 0 { + if config.WorkerPoolSize != 0 || config.MaxWorkerTaskLen != 0 { + panic("BalanceWorkderTaskLen and WorkerPoolSize+MaxWorkerTaskLen do not work together") + } + GlobalObject.WorkerPoolSize = uint32(GlobalObject.MaxConn) + GlobalObject.MaxWorkerTaskLen = config.BalanceWorkderTaskLen + } + if config.MaxMsgChanLen != 0 { GlobalObject.MaxMsgChanLen = config.MaxMsgChanLen } diff --git a/ziface/iconnection.go b/ziface/iconnection.go index 545d5359..987889c6 100644 --- a/ziface/iconnection.go +++ b/ziface/iconnection.go @@ -30,6 +30,7 @@ type IConnection interface { GetTCPConnection() net.Conn // Get the original socket TCPConn from the current connection (从当前连接获取原始的socket TCPConn) GetConnID() uint64 // Get the current connection ID (获取当前连接ID) GetMsgHandler() IMsgHandle // Get the message handler (获取消息处理器) + GetWorkerID() uint32 // Get Worker ID(获取workerid) RemoteAddr() net.Addr // Get the remote address information of the connection (获取链接远程地址信息) LocalAddr() net.Addr // Get the local address information of the connection (获取链接本地地址信息) LocalAddrString() string // Get the local address information of the connection as a string diff --git a/ziface/imsghandler.go b/ziface/imsghandler.go index 0ce01f74..ec7546bd 100644 --- a/ziface/imsghandler.go +++ b/ziface/imsghandler.go @@ -24,4 +24,12 @@ type IMsgHandle interface { // the order depends on the registration order // (注册责任链任务入口,每个拦截器处理完后,数据都会传递至下一个拦截器,使得消息可以层层处理层层传递,顺序取决于注册顺序) AddInterceptor(interceptor IInterceptor) + + // Use worker ID + // 占用workerID + UseWorker(IConnection) uint32 + + // Free worker ID + // 释放workerid + FreeWorker(workerID uint32) } diff --git a/znet/connection.go b/znet/connection.go index e705f760..3ae7f8dd 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -32,6 +32,10 @@ type Connection struct { // 这个是理论支持的进程connID的最大数量) connID uint64 + // The workerid responsible for handling the link + // 负责处理该链接的workerid + workerID uint32 + // The message management module that manages MsgID and the corresponding processing method // (消息管理MsgID和对应处理方法的消息管理模块) msgHandler ziface.IMsgHandle @@ -276,6 +280,9 @@ func (c *Connection) Start() { c.updateActivity() } + // 占用workerid + c.workerID = c.msgHandler.UseWorker(c) + // Start the Goroutine for reading data from the client // (开启用户从客户端读取数据流程的Goroutine) go c.StartReader() @@ -283,6 +290,9 @@ func (c *Connection) Start() { select { case <-c.ctx.Done(): c.finalizer() + + // 归还workerid + c.msgHandler.FreeWorker(c.workerID) return } } @@ -310,6 +320,10 @@ func (c *Connection) GetConnID() uint64 { return c.connID } +func (c *Connection) GetWorkerID() uint32 { + return c.workerID +} + func (c *Connection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } diff --git a/znet/msghandler.go b/znet/msghandler.go index 33920bc0..0a64f80b 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -3,6 +3,8 @@ package znet import ( "encoding/hex" "fmt" + "sync" + "github.com/aceld/zinx/zconf" "github.com/aceld/zinx/ziface" "github.com/aceld/zinx/zlog" @@ -27,6 +29,11 @@ type MsgHandle struct { // (业务工作Worker池的数量) WorkerPoolSize uint32 + // A collection of idle workers for balancing versions + // 空闲worker集合,用于平衡版本 + freeWorkers map[uint32]struct{} + freeWorkerMu sync.Mutex + // A message queue for workers to take tasks // (Worker负责取任务的消息队列) TaskQueue []chan ziface.IRequest @@ -49,12 +56,54 @@ func newMsgHandle() *MsgHandle { builder: newChainBuilder(), } + if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { + handle.freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize) + for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ { + handle.freeWorkers[i] = struct{}{} + } + } + // It is necessary to add the MsgHandle to the responsibility chain here, and it is the last link in the responsibility chain. After decoding in the MsgHandle, data distribution is done by router // (此处必须把 msghandler 添加到责任链中,并且是责任链最后一环,在msghandler中进行解码后由router做数据分发) handle.builder.Tail(handle) return handle } +// Use worker ID +// 占用workerID +func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { + if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { + // 改成空闲hashmap,以进行绝对的负载均衡,仅适用于新算法,因为新算法不会有重叠 + // 新算法应该有两个封装函数,务必确保释放和回收的数量一致(如果不一致,最严重结果是会发生相互影响的阻塞) + mh.freeWorkerMu.Lock() + defer mh.freeWorkerMu.Lock() + + for k := range mh.freeWorkers { + delete(mh.freeWorkers, k) + return k + } + } else { + // Assign the worker responsible for processing the current connection based on the ConnID + // Using a round-robin average allocation rule to get the workerID that needs to process this connection + // (根据ConnID来分配当前的连接应该由哪个worker负责处理 + // 轮询的平均分配法则 + // 得到需要处理此条连接的workerID) + return uint32(conn.GetConnID() % uint64(mh.WorkerPoolSize)) + } + return 0 +} + +// Free worker ID +// 释放workerid +func (mh *MsgHandle) FreeWorker(workerID uint32) { + if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { + mh.freeWorkerMu.Lock() + defer mh.freeWorkerMu.Lock() + + mh.freeWorkers[workerID] = struct{}{} + } +} + // Data processing interceptor that is necessary by default in Zinx // (Zinx默认必经的数据处理拦截器) func (mh *MsgHandle) Intercept(chain ziface.IChain) ziface.IcResp { @@ -90,20 +139,10 @@ func (mh *MsgHandle) AddInterceptor(interceptor ziface.IInterceptor) { } } -func (mh *MsgHandle) GetTaskQueueWorkerId(request ziface.IRequest) uint64 { - // Assign the worker responsible for processing the current connection based on the ConnID - // Using a round-robin average allocation rule to get the workerID that needs to process this connection - // (根据ConnID来分配当前的连接应该由哪个worker负责处理 - // 轮询的平均分配法则 - // 得到需要处理此条连接的workerID) - workerID := request.GetConnection().GetConnID() % uint64(mh.WorkerPoolSize) - return workerID -} - // SendMsgToTaskQueue sends the message to the TaskQueue for processing by the worker // (将消息交给TaskQueue,由worker进行处理) func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { - workerID := mh.GetTaskQueueWorkerId(request) + workerID := request.GetConnection().GetWorkerID() // zlog.Ins().DebugF("Add ConnID=%d request msgID=%d to workerID=%d", request.GetConnection().GetConnID(), request.GetMsgID(), workerID) // Send the request message to the task queue mh.TaskQueue[workerID] <- request diff --git a/znet/ws_connection.go b/znet/ws_connection.go index d234213e..f118f3f1 100644 --- a/znet/ws_connection.go +++ b/znet/ws_connection.go @@ -30,6 +30,10 @@ type WsConnection struct { // 这个是理论支持的进程connID的最大数量) connID uint64 + // The workerid responsible for handling the link + // 负责处理该链接的workerid + workerID uint32 + // msgHandler is the message management module for MsgID and the corresponding message handling method. // (消息管理MsgID和对应处理方法的消息管理模块) msgHandler ziface.IMsgHandle @@ -265,6 +269,9 @@ func (c *WsConnection) Start() { c.updateActivity() } + // 占用workerid + c.workerID = c.msgHandler.UseWorker(c) + // Start the Goroutine for users to read data from the client. // (开启用户从客户端读取数据流程的Goroutine) go c.StartReader() @@ -272,6 +279,9 @@ func (c *WsConnection) Start() { select { case <-c.ctx.Done(): c.finalizer() + + // 归还workerid + c.msgHandler.FreeWorker(c.workerID) return } } @@ -299,6 +309,10 @@ func (c *WsConnection) GetConnID() uint64 { return c.connID } +func (c *WsConnection) GetWorkerID() uint32 { + return c.workerID +} + func (c *WsConnection) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } From f0111b322e7484f2b2d4485638ee0c78fbd76f29 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Sat, 13 May 2023 13:17:54 +0800 Subject: [PATCH 05/12] =?UTF-8?q?Assign=20a=20workder=20to=20each=20link,?= =?UTF-8?q?=20avoid=20interactions=20when=20multiple=20links=20are=20proce?= =?UTF-8?q?ssed=20by=20the=20same=20worker=20=EF=BC=88=E4=B8=BA=E6=AF=8F?= =?UTF-8?q?=E4=B8=AA=E9=93=BE=E6=8E=A5=E5=88=86=E9=85=8D=E4=B8=80=E4=B8=AA?= =?UTF-8?q?workder=EF=BC=8C=E9=81=BF=E5=85=8D=E5=90=8C=E4=B8=80worker?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=A4=9A=E4=B8=AA=E9=93=BE=E6=8E=A5=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E4=BA=92=E7=9B=B8=E5=BD=B1=E5=93=8D=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zconf/globalobj.go | 11 +++++++++++ zconf/userconf.go | 9 +-------- znet/msghandler.go | 26 +++++++++++++++----------- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/zconf/globalobj.go b/zconf/globalobj.go index 37de198f..6f944610 100644 --- a/zconf/globalobj.go +++ b/zconf/globalobj.go @@ -173,6 +173,17 @@ func (g *Config) InitLogConfig() { } } +// Assign a workder to each link and use this value instead of MaxWorkerTaskLen, Avoid interactions when multiple links are processed by the same worker +// 为每个链接分配一个workder,同时使用这个值替代MaxWorkerTaskLen,避免同一worker处理多个链接时的互相影响 +func (g *Config) ChangeWorkerSize(balanceWorkderTaskLen uint32) { + if g.WorkerPoolSize != 0 || g.MaxWorkerTaskLen != 0 { + panic("If you want to use BalanceWorkderTaskLen, explicitly set WorkerPoolSize and MaxWorkerTaskLen to 0.") + } + + GlobalObject.WorkerPoolSize = uint32(GlobalObject.MaxConn) + GlobalObject.MaxWorkerTaskLen = balanceWorkderTaskLen +} + /* init, set default value */ diff --git a/zconf/userconf.go b/zconf/userconf.go index 80b899d7..c712edd8 100644 --- a/zconf/userconf.go +++ b/zconf/userconf.go @@ -35,15 +35,8 @@ func UserConfToGlobal(config *Config) { if config.MaxWorkerTaskLen != 0 { GlobalObject.MaxWorkerTaskLen = config.MaxWorkerTaskLen } - - // Assign a workder to each link and use this value instead of MaxWorkerTaskLen, Avoid interactions when multiple links are processed by the same worker - // 为每个链接分配一个workder,同时使用这个值替代MaxWorkerTaskLen,避免同一worker处理多个链接时的互相影响 if config.BalanceWorkderTaskLen != 0 { - if config.WorkerPoolSize != 0 || config.MaxWorkerTaskLen != 0 { - panic("BalanceWorkderTaskLen and WorkerPoolSize+MaxWorkerTaskLen do not work together") - } - GlobalObject.WorkerPoolSize = uint32(GlobalObject.MaxConn) - GlobalObject.MaxWorkerTaskLen = config.BalanceWorkderTaskLen + GlobalObject.ChangeWorkerSize(config.BalanceWorkderTaskLen) } if config.MaxMsgChanLen != 0 { diff --git a/znet/msghandler.go b/znet/msghandler.go index 0a64f80b..5b14a4ff 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -47,20 +47,24 @@ type MsgHandle struct { // newMsgHandle creates MsgHandle // zinxRole: IServer/IClient func newMsgHandle() *MsgHandle { + var freeWorkers map[uint32]struct{} + if zconf.GlobalObject.BalanceWorkderTaskLen != 0 { + zconf.GlobalObject.ChangeWorkerSize(zconf.GlobalObject.BalanceWorkderTaskLen) + + freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize) + for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ { + freeWorkers[i] = struct{}{} + } + } + handle := &MsgHandle{ Apis: make(map[uint32]ziface.IRouter), RouterSlices: NewRouterSlices(), WorkerPoolSize: zconf.GlobalObject.WorkerPoolSize, // One worker corresponds to one queue (一个worker对应一个queue) - TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize), - builder: newChainBuilder(), - } - - if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { - handle.freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize) - for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ { - handle.freeWorkers[i] = struct{}{} - } + TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize), + freeWorkers: freeWorkers, + builder: newChainBuilder(), } // It is necessary to add the MsgHandle to the responsibility chain here, and it is the last link in the responsibility chain. After decoding in the MsgHandle, data distribution is done by router @@ -76,7 +80,7 @@ func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { // 改成空闲hashmap,以进行绝对的负载均衡,仅适用于新算法,因为新算法不会有重叠 // 新算法应该有两个封装函数,务必确保释放和回收的数量一致(如果不一致,最严重结果是会发生相互影响的阻塞) mh.freeWorkerMu.Lock() - defer mh.freeWorkerMu.Lock() + defer mh.freeWorkerMu.Unlock() for k := range mh.freeWorkers { delete(mh.freeWorkers, k) @@ -98,7 +102,7 @@ func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { func (mh *MsgHandle) FreeWorker(workerID uint32) { if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { mh.freeWorkerMu.Lock() - defer mh.freeWorkerMu.Lock() + defer mh.freeWorkerMu.Unlock() mh.freeWorkers[workerID] = struct{}{} } From c3b3094fbfa70403502260aab8857996b16d4511 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Sat, 13 May 2023 18:10:00 +0800 Subject: [PATCH 06/12] =?UTF-8?q?Assign=20a=20workder=20to=20each=20link,?= =?UTF-8?q?=20avoid=20interactions=20when=20multiple=20links=20are=20proce?= =?UTF-8?q?ssed=20by=20the=20same=20worker=20=EF=BC=88=E4=B8=BA=E6=AF=8F?= =?UTF-8?q?=E4=B8=AA=E9=93=BE=E6=8E=A5=E5=88=86=E9=85=8D=E4=B8=80=E4=B8=AA?= =?UTF-8?q?workder=EF=BC=8C=E9=81=BF=E5=85=8D=E5=90=8C=E4=B8=80worker?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=A4=9A=E4=B8=AA=E9=93=BE=E6=8E=A5=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E4=BA=92=E7=9B=B8=E5=BD=B1=E5=93=8D=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/zinx_server/conf/zinx.json | 1 + zconf/globalobj.go | 67 ++++++++++++----------------- zconf/userconf.go | 4 +- znet/msghandler.go | 12 ++++-- 4 files changed, 39 insertions(+), 45 deletions(-) diff --git a/examples/zinx_server/conf/zinx.json b/examples/zinx_server/conf/zinx.json index 077e18f4..43042248 100644 --- a/examples/zinx_server/conf/zinx.json +++ b/examples/zinx_server/conf/zinx.json @@ -3,6 +3,7 @@ "Host":"127.0.0.1", "TCPPort":8999, "MaxConn":3, + "WorkerMode": "", "WorkerPoolSize":10, "LogDir": "./mylog", "LogFile":"zinx.log", diff --git a/zconf/globalobj.go b/zconf/globalobj.go index 6f944610..1f7e080f 100644 --- a/zconf/globalobj.go +++ b/zconf/globalobj.go @@ -42,14 +42,14 @@ type Config struct { /* Zinx */ - Version string //The version of the Zinx framework.(当前Zinx版本号) - MaxPacketSize uint32 //The maximum size of the packets that can be sent or received.(读写数据包的最大值) - MaxConn int //The maximum number of connections that the server can handle.(当前服务器主机允许的最大链接个数) - WorkerPoolSize uint32 //The number of worker pools in the business logic.(业务工作Worker池的数量) - MaxWorkerTaskLen uint32 //The maximum number of tasks that a worker pool can handle.(业务工作Worker对应负责的任务队列最大任务存储数量) - BalanceWorkderTaskLen uint32 //Assign a workder to each link.(为每个链接分配一个workder) - MaxMsgChanLen uint32 //The maximum length of the send buffer message queue.(SendBuffMsg发送消息的缓冲最大长度) - IOReadBuffSize uint32 //The maximum size of the read buffer for each IO operation.(每次IO最大的读取长度) + Version string //The version of the Zinx framework.(当前Zinx版本号) + MaxPacketSize uint32 //The maximum size of the packets that can be sent or received.(读写数据包的最大值) + MaxConn int //The maximum number of connections that the server can handle.(当前服务器主机允许的最大链接个数) + WorkerPoolSize uint32 //The number of worker pools in the business logic.(业务工作Worker池的数量) + MaxWorkerTaskLen uint32 //The maximum number of tasks that a worker pool can handle.(业务工作Worker对应负责的任务队列最大任务存储数量) + WorkerMode string //The way to assign workers to connections.(为链接分配worker的方式) + MaxMsgChanLen uint32 //The maximum length of the send buffer message queue.(SendBuffMsg发送消息的缓冲最大长度) + IOReadBuffSize uint32 //The maximum size of the read buffer for each IO operation.(每次IO最大的读取长度) //The server mode, which can be "tcp" or "websocket". If it is empty, both modes are enabled. //"tcp":tcp监听, "websocket":websocket 监听 为空时同时开启 @@ -173,17 +173,6 @@ func (g *Config) InitLogConfig() { } } -// Assign a workder to each link and use this value instead of MaxWorkerTaskLen, Avoid interactions when multiple links are processed by the same worker -// 为每个链接分配一个workder,同时使用这个值替代MaxWorkerTaskLen,避免同一worker处理多个链接时的互相影响 -func (g *Config) ChangeWorkerSize(balanceWorkderTaskLen uint32) { - if g.WorkerPoolSize != 0 || g.MaxWorkerTaskLen != 0 { - panic("If you want to use BalanceWorkderTaskLen, explicitly set WorkerPoolSize and MaxWorkerTaskLen to 0.") - } - - GlobalObject.WorkerPoolSize = uint32(GlobalObject.MaxConn) - GlobalObject.MaxWorkerTaskLen = balanceWorkderTaskLen -} - /* init, set default value */ @@ -206,26 +195,26 @@ func init() { // Initialize the GlobalObject variable and set some default values. // (初始化GlobalObject变量,设置一些默认值) GlobalObject = &Config{ - Name: "ZinxServerApp", - Version: "V1.0", - TCPPort: 8999, - WsPort: 9000, - Host: "0.0.0.0", - MaxConn: 12000, - MaxPacketSize: 4096, - WorkerPoolSize: 10, - MaxWorkerTaskLen: 1024, - BalanceWorkderTaskLen: 0, // Recommended value:50.(推荐值:50) - MaxMsgChanLen: 1024, - LogDir: pwd + "/log", - LogFile: "", //if set "", print to Stderr(默认日志文件为空,打印到stderr) - LogIsolationLevel: 0, - HeartbeatMax: 10, //The default maximum interval for heartbeat detection is 10 seconds. (默认心跳检测最长间隔为10秒) - IOReadBuffSize: 1024, - CertFile: "", - PrivateKeyFile: "", - Mode: ServerModeTcp, - RouterSlicesMode: false, + Name: "ZinxServerApp", + Version: "V1.0", + TCPPort: 8999, + WsPort: 9000, + Host: "0.0.0.0", + MaxConn: 12000, + MaxPacketSize: 4096, + WorkerPoolSize: 10, + MaxWorkerTaskLen: 1024, + WorkerMode: "", + MaxMsgChanLen: 1024, + LogDir: pwd + "/log", + LogFile: "", //if set "", print to Stderr(默认日志文件为空,打印到stderr) + LogIsolationLevel: 0, + HeartbeatMax: 10, //The default maximum interval for heartbeat detection is 10 seconds. (默认心跳检测最长间隔为10秒) + IOReadBuffSize: 1024, + CertFile: "", + PrivateKeyFile: "", + Mode: ServerModeTcp, + RouterSlicesMode: false, } // Note: Load some user-configured parameters from the configuration file. diff --git a/zconf/userconf.go b/zconf/userconf.go index c712edd8..8779feb3 100644 --- a/zconf/userconf.go +++ b/zconf/userconf.go @@ -35,8 +35,8 @@ func UserConfToGlobal(config *Config) { if config.MaxWorkerTaskLen != 0 { GlobalObject.MaxWorkerTaskLen = config.MaxWorkerTaskLen } - if config.BalanceWorkderTaskLen != 0 { - GlobalObject.ChangeWorkerSize(config.BalanceWorkderTaskLen) + if config.WorkerMode != "" { + GlobalObject.WorkerMode = config.WorkerMode } if config.MaxMsgChanLen != 0 { diff --git a/znet/msghandler.go b/znet/msghandler.go index 5b14a4ff..c0bf9130 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -29,8 +29,8 @@ type MsgHandle struct { // (业务工作Worker池的数量) WorkerPoolSize uint32 - // A collection of idle workers for balancing versions - // 空闲worker集合,用于平衡版本 + // A collection of idle workers, used for zconf.{WorkMode:OneWorkerEachConn} + // 空闲worker集合,用于zconf.{WorkMode:OneWorkerEachConn} freeWorkers map[uint32]struct{} freeWorkerMu sync.Mutex @@ -48,8 +48,12 @@ type MsgHandle struct { // zinxRole: IServer/IClient func newMsgHandle() *MsgHandle { var freeWorkers map[uint32]struct{} - if zconf.GlobalObject.BalanceWorkderTaskLen != 0 { - zconf.GlobalObject.ChangeWorkerSize(zconf.GlobalObject.BalanceWorkderTaskLen) + if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { + // Assign a workder to each link, avoid interactions when multiple links are processed by the same worker + // MaxWorkerTaskLen can also be reduced, for example, 50 + // 为每个链接分配一个workder,避免同一worker处理多个链接时的互相影响 + // 同时可以减小MaxWorkerTaskLen,比如50,因为每个worker的负担减轻了 + zconf.GlobalObject.WorkerPoolSize = uint32(zconf.GlobalObject.MaxConn) freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize) for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ { From 0be593976c61c03620c0ad90ab244cd8602a9309 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Tue, 23 May 2023 11:55:19 +0800 Subject: [PATCH 07/12] Simplify code --- znet/msghandler.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/znet/msghandler.go b/znet/msghandler.go index c0bf9130..5ac9da7a 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -54,7 +54,6 @@ func newMsgHandle() *MsgHandle { // 为每个链接分配一个workder,避免同一worker处理多个链接时的互相影响 // 同时可以减小MaxWorkerTaskLen,比如50,因为每个worker的负担减轻了 zconf.GlobalObject.WorkerPoolSize = uint32(zconf.GlobalObject.MaxConn) - freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize) for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ { freeWorkers[i] = struct{}{} @@ -80,9 +79,7 @@ func newMsgHandle() *MsgHandle { // Use worker ID // 占用workerID func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { - if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { - // 改成空闲hashmap,以进行绝对的负载均衡,仅适用于新算法,因为新算法不会有重叠 - // 新算法应该有两个封装函数,务必确保释放和回收的数量一致(如果不一致,最严重结果是会发生相互影响的阻塞) + if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() @@ -104,7 +101,7 @@ func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { // Free worker ID // 释放workerid func (mh *MsgHandle) FreeWorker(workerID uint32) { - if int(zconf.GlobalObject.WorkerPoolSize) == zconf.GlobalObject.MaxConn { + if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() From 6614a9112c3f46889a20464263e8878272f9c5d9 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Mon, 12 Jun 2023 18:35:18 +0800 Subject: [PATCH 08/12] Revert mmo_example, make this PR clean. --- zinx_app_demo/mmo_game/datapack/pack.go | 36 ------------------------- 1 file changed, 36 deletions(-) delete mode 100644 zinx_app_demo/mmo_game/datapack/pack.go diff --git a/zinx_app_demo/mmo_game/datapack/pack.go b/zinx_app_demo/mmo_game/datapack/pack.go deleted file mode 100644 index 4ec62b95..00000000 --- a/zinx_app_demo/mmo_game/datapack/pack.go +++ /dev/null @@ -1,36 +0,0 @@ -package datapack - -import ( - "fmt" - - "github.com/aceld/zinx/ziface" - "github.com/aceld/zinx/zpack" - "github.com/golang/protobuf/proto" -) - -// Replace here with your custom packer (if one exists) -// 将这里替换为你自定义的打包器(如果存在的话) -var DefaultPack = zpack.Factory().NewPack(ziface.ZinxDataPack) - -// Serialize message -// 将消息序列化 -func SerializeMsg2Bytes(msgId uint32, msgData proto.Message) []byte { - - // Marshal message - // 将proto Message结构体序列化 - msg, err := proto.Marshal(msgData) - if err != nil { - fmt.Printf("SerializeMsg2Bytes, marshal failed, msgId:%v, msgData:%v, err: %v\n", msgId, msgData, err) - return nil - } - - // Pack data - // 封包 - data, err := DefaultPack.Pack(zpack.NewMsgPackage(msgId, msg)) - if err != nil { - fmt.Printf("SerializeMsg2Bytes, pack failed, msgId:%v, msg:%v, err: %v\n", msgId, msg, err) - return nil - } - - return data -} From 4cc89bba66d98ac9b3eb66c299bab17a71c8f67a Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Mon, 12 Jun 2023 19:08:44 +0800 Subject: [PATCH 09/12] Revert .golangci.yaml. --- .golangci.yaml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .golangci.yaml diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 00000000..a7b1968a --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,28 @@ +run: + timeout: 30m + skip-dirs: + - examples + +linters: + disable-all: true + enable: + #- unused + - ineffassign + - goimports + - gofmt + - misspell + - unparam + - unconvert + - govet + # - errcheck + - staticcheck + +linters-settings: + staticcheck: + go: "1.16" + checks: + - "all" + - "-SA1019" + + unused: + go: "1.16" From 87984f2fa70da5241ab4d5e82562fdb416feaa9b Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Mon, 12 Jun 2023 19:16:34 +0800 Subject: [PATCH 10/12] Fix lint error. --- zinterceptor/framedocder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zinterceptor/framedocder.go b/zinterceptor/framedocder.go index 2ef2820f..231b21d8 100644 --- a/zinterceptor/framedocder.go +++ b/zinterceptor/framedocder.go @@ -526,7 +526,7 @@ func (d *FrameDecoder) decode(buf []byte) []byte { //跳过的字节数是否大于数据包长度 if d.InitialBytesToStrip > frameLengthInt { - d.failOnFrameLengthLessThanInitialBytesToStrip(in, int64(frameLength), d.InitialBytesToStrip) + d.failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, d.InitialBytesToStrip) } //跳过initialBytesToStrip个字节 in.Next(d.InitialBytesToStrip) From 46ad8eaf2add193eaba9269ff1b5fedacc2562b0 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Tue, 13 Jun 2023 21:08:20 +0800 Subject: [PATCH 11/12] =?UTF-8?q?Optimize=20the=20work=20mode=20name=20and?= =?UTF-8?q?=20optimize=20the=20assignment=20of=20worker=20in=20case=20of?= =?UTF-8?q?=20exception.=20=EF=BC=88=E4=BC=98=E5=8C=96=E5=B7=A5=E4=BD=9C?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=E5=90=8D=E7=A7=B0=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=83=85=E5=86=B5=E4=B8=8Bworker=E7=9A=84?= =?UTF-8?q?=E5=88=86=E9=85=8D=E3=80=82=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zconf/globalobj.go | 5 +++++ znet/msghandler.go | 25 ++++++++++++------------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/zconf/globalobj.go b/zconf/globalobj.go index 1f7e080f..d95e84a1 100644 --- a/zconf/globalobj.go +++ b/zconf/globalobj.go @@ -24,6 +24,11 @@ const ( ServerModeWebsocket = "websocket" ) +const ( + WorkerModeHash = "Hash" //By default, the round-robin average allocation rule is used.(默认使用取余的方式) + WorkerModeBind = "Bind" //Bind a worker to each connection.(为每个连接分配一个worker) +) + /* Store all global parameters related to the Zinx framework for use by other modules. Some parameters can also be configured by the user based on the zinx.json file. diff --git a/znet/msghandler.go b/znet/msghandler.go index 7222ad94..a2bc07c9 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -29,8 +29,8 @@ type MsgHandle struct { // (业务工作Worker池的数量) WorkerPoolSize uint32 - // A collection of idle workers, used for zconf.{WorkMode:OneWorkerEachConn} - // 空闲worker集合,用于zconf.{WorkMode:OneWorkerEachConn} + // A collection of idle workers, used for zconf.WorkerModeBind + // 空闲worker集合,用于zconf.WorkerModeBind freeWorkers map[uint32]struct{} freeWorkerMu sync.Mutex @@ -48,7 +48,7 @@ type MsgHandle struct { // zinxRole: IServer/IClient func newMsgHandle() *MsgHandle { var freeWorkers map[uint32]struct{} - if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { + if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind { // Assign a workder to each link, avoid interactions when multiple links are processed by the same worker // MaxWorkerTaskLen can also be reduced, for example, 50 // 为每个链接分配一个workder,避免同一worker处理多个链接时的互相影响 @@ -79,7 +79,7 @@ func newMsgHandle() *MsgHandle { // Use worker ID // 占用workerID func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { - if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { + if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() @@ -87,21 +87,20 @@ func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { delete(mh.freeWorkers, k) return k } - } else { - // Assign the worker responsible for processing the current connection based on the ConnID - // Using a round-robin average allocation rule to get the workerID that needs to process this connection - // (根据ConnID来分配当前的连接应该由哪个worker负责处理 - // 轮询的平均分配法则 - // 得到需要处理此条连接的workerID) - return uint32(conn.GetConnID() % uint64(mh.WorkerPoolSize)) } - return 0 + + // Assign the worker responsible for processing the current connection based on the ConnID + // Using a round-robin average allocation rule to get the workerID that needs to process this connection + // (根据ConnID来分配当前的连接应该由哪个worker负责处理 + // 轮询的平均分配法则 + // 得到需要处理此条连接的workerID) + return uint32(conn.GetConnID() % uint64(mh.WorkerPoolSize)) } // Free worker ID // 释放workerid func (mh *MsgHandle) FreeWorker(workerID uint32) { - if zconf.GlobalObject.WorkerMode == "OneWorkerEachConn" { + if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() From d1efca969d13f9e23f4577304404530b5526d164 Mon Sep 17 00:00:00 2001 From: "EGG_PC\\egg" Date: Tue, 13 Jun 2023 21:31:25 +0800 Subject: [PATCH 12/12] =?UTF-8?q?Privatizing=20internal=20methods.=20(?= =?UTF-8?q?=E5=B0=86=E5=86=85=E9=83=A8=E6=96=B9=E6=B3=95=E7=A7=81=E6=9C=89?= =?UTF-8?q?=E5=8C=96)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ziface/imsghandler.go | 8 -------- znet/connection.go | 4 ++-- znet/msghandler.go | 18 +++++++++++++++--- znet/ws_connection.go | 4 ++-- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/ziface/imsghandler.go b/ziface/imsghandler.go index 19e7e78c..df853032 100644 --- a/ziface/imsghandler.go +++ b/ziface/imsghandler.go @@ -22,12 +22,4 @@ type IMsgHandle interface { // the order depends on the registration order // (注册责任链任务入口,每个拦截器处理完后,数据都会传递至下一个拦截器,使得消息可以层层处理层层传递,顺序取决于注册顺序) AddInterceptor(interceptor IInterceptor) - - // Use worker ID - // 占用workerID - UseWorker(IConnection) uint32 - - // Free worker ID - // 释放workerid - FreeWorker(workerID uint32) } diff --git a/znet/connection.go b/znet/connection.go index c862d0ff..bd0d8058 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -281,7 +281,7 @@ func (c *Connection) Start() { } // 占用workerid - c.workerID = c.msgHandler.UseWorker(c) + c.workerID = useWorker(c) // Start the Goroutine for reading data from the client // (开启用户从客户端读取数据流程的Goroutine) @@ -292,7 +292,7 @@ func (c *Connection) Start() { c.finalizer() // 归还workerid - c.msgHandler.FreeWorker(c.workerID) + freeWorker(c) return } } diff --git a/znet/msghandler.go b/znet/msghandler.go index a2bc07c9..07af0f09 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -78,7 +78,13 @@ func newMsgHandle() *MsgHandle { // Use worker ID // 占用workerID -func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { +func useWorker(conn ziface.IConnection) uint32 { + mh, _ := conn.GetMsgHandler().(*MsgHandle) + if mh == nil { + zlog.Ins().ErrorF("useWorker failed, mh is nil") + return 0 + } + if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() @@ -99,12 +105,18 @@ func (mh *MsgHandle) UseWorker(conn ziface.IConnection) uint32 { // Free worker ID // 释放workerid -func (mh *MsgHandle) FreeWorker(workerID uint32) { +func freeWorker(conn ziface.IConnection) { + mh, _ := conn.GetMsgHandler().(*MsgHandle) + if mh == nil { + zlog.Ins().ErrorF("useWorker failed, mh is nil") + return + } + if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind { mh.freeWorkerMu.Lock() defer mh.freeWorkerMu.Unlock() - mh.freeWorkers[workerID] = struct{}{} + mh.freeWorkers[conn.GetWorkerID()] = struct{}{} } } diff --git a/znet/ws_connection.go b/znet/ws_connection.go index d3477b46..b89ce388 100644 --- a/znet/ws_connection.go +++ b/znet/ws_connection.go @@ -269,7 +269,7 @@ func (c *WsConnection) Start() { } // 占用workerid - c.workerID = c.msgHandler.UseWorker(c) + c.workerID = useWorker(c) // Start the Goroutine for users to read data from the client. // (开启用户从客户端读取数据流程的Goroutine) @@ -280,7 +280,7 @@ func (c *WsConnection) Start() { c.finalizer() // 归还workerid - c.msgHandler.FreeWorker(c.workerID) + freeWorker(c) return } }