From 1f8ffd34ea625278f020c59fca7aeba99162c3e3 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Mon, 22 Dec 2025 22:25:58 -0500 Subject: [PATCH 1/4] Revert "Refactor WrapLink logic (#5288)" This reverts commit f9dd3aef72f013fca94e6b0a604c636df62285d0. --- app/reverse/bridge.go | 5 ++--- common/mux/server.go | 5 ++--- features/routing/dispatcher.go | 6 ------ proxy/vless/inbound/inbound.go | 14 ++++---------- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go index 324fea59ef8d..fc83a7405dd7 100644 --- a/app/reverse/bridge.go +++ b/app/reverse/bridge.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/mux" "github.com/xtls/xray-core/common/net" @@ -230,9 +231,7 @@ func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, l return w.Dispatcher.DispatchLink(ctx, dest, link) } - if d, ok := w.Dispatcher.(routing.WrapLinkDispatcher); ok { - link = d.WrapLink(ctx, link) - } + link = w.Dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) w.handleInternalConn(link) return nil diff --git a/common/mux/server.go b/common/mux/server.go index 1c090185360b..f01c325d08dc 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" @@ -63,9 +64,7 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t if dest.Address != muxCoolAddress { return s.dispatcher.DispatchLink(ctx, dest, link) } - if d, ok := s.dispatcher.(routing.WrapLinkDispatcher); ok { - link = d.WrapLink(ctx, link) - } + link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) worker, err := NewServerWorker(ctx, s.dispatcher, link) if err != nil { return err diff --git a/features/routing/dispatcher.go b/features/routing/dispatcher.go index c8354446c5a0..53d3bf900f15 100644 --- a/features/routing/dispatcher.go +++ b/features/routing/dispatcher.go @@ -26,9 +26,3 @@ type Dispatcher interface { func DispatcherType() interface{} { return (*Dispatcher)(nil) } - -// Just for type assertion -type WrapLinkDispatcher interface { - Dispatcher - WrapLink(ctx context.Context, link *transport.Link) *transport.Link -} diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index eeb1a25f79dc..bce4e9385700 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -12,6 +12,7 @@ import ( "time" "unsafe" + "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/app/reverse" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" @@ -75,7 +76,7 @@ type Handler struct { validator vless.Validator decryption *encryption.ServerInstance outboundHandlerManager outbound.Manager - wrapLink func(ctx context.Context, link *transport.Link) *transport.Link + defaultDispatcher *dispatcher.DefaultDispatcher ctx context.Context fallbacks map[string]map[string]map[string]*Fallback // or nil // regexps map[string]*regexp.Regexp // or nil @@ -84,16 +85,12 @@ type Handler struct { // New creates a new VLess inbound handler. func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Validator) (*Handler, error) { v := core.MustFromContext(ctx) - var wrapLinkFunc func(ctx context.Context, link *transport.Link) *transport.Link - if dispatcher, ok := v.GetFeature(routing.DispatcherType()).(routing.WrapLinkDispatcher); ok { - wrapLinkFunc = dispatcher.WrapLink - } handler := &Handler{ inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager), policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), validator: validator, outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager), - wrapLink: wrapLinkFunc, + defaultDispatcher: v.GetFeature(routing.DispatcherType()).(*dispatcher.DefaultDispatcher), ctx: ctx, } @@ -623,10 +620,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s if err != nil { return err } - if h.wrapLink == nil { - return errors.New("VLESS reverse must have a dispatcher that implemented routing.WrapLinkDispatcher") - } - return r.NewMux(ctx, h.wrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) + return r.NewMux(ctx, h.defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) } if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ From 86fd8a7136082b6ce006adfc4edbbb4a7eb75ab2 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Mon, 22 Dec 2025 22:33:51 -0500 Subject: [PATCH 2/4] Remove redundant stats in mux and bridge dispatcher Since mux will add traffic stats in sub connection's dispatch function Adding additional stats result to double counting --- app/reverse/bridge.go | 3 --- common/mux/server.go | 2 -- 2 files changed, 5 deletions(-) diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go index fc83a7405dd7..f6dfec48af37 100644 --- a/app/reverse/bridge.go +++ b/app/reverse/bridge.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/mux" "github.com/xtls/xray-core/common/net" @@ -230,8 +229,6 @@ func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, l } return w.Dispatcher.DispatchLink(ctx, dest, link) } - - link = w.Dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) w.handleInternalConn(link) return nil diff --git a/common/mux/server.go b/common/mux/server.go index f01c325d08dc..d1cdac113e44 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -5,7 +5,6 @@ import ( "io" "time" - "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/errors" @@ -64,7 +63,6 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t if dest.Address != muxCoolAddress { return s.dispatcher.DispatchLink(ctx, dest, link) } - link = s.dispatcher.(*dispatcher.DefaultDispatcher).WrapLink(ctx, link) worker, err := NewServerWorker(ctx, s.dispatcher, link) if err != nil { return err From 2df123a76b137cf2f9c03e6feb9742a4e1975799 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Wed, 24 Dec 2025 22:54:15 -0500 Subject: [PATCH 3/4] Inject dispatcher as interface --- proxy/vless/inbound/inbound.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index bce4e9385700..69439ffdeecc 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -76,7 +76,7 @@ type Handler struct { validator vless.Validator decryption *encryption.ServerInstance outboundHandlerManager outbound.Manager - defaultDispatcher *dispatcher.DefaultDispatcher + defaultDispatcher routing.Dispatcher ctx context.Context fallbacks map[string]map[string]map[string]*Fallback // or nil // regexps map[string]*regexp.Regexp // or nil @@ -90,7 +90,7 @@ func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Val policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), validator: validator, outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager), - defaultDispatcher: v.GetFeature(routing.DispatcherType()).(*dispatcher.DefaultDispatcher), + defaultDispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher), ctx: ctx, } @@ -261,7 +261,7 @@ func (*Handler) Network() []net.Network { } // Process implements proxy.Inbound.Process(). -func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatcher routing.Dispatcher) error { +func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatch routing.Dispatcher) error { iConn := stat.TryUnwrapStatsConn(connection) if h.decryption != nil { @@ -620,10 +620,14 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s if err != nil { return err } - return r.NewMux(ctx, h.defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) + defaultDispatcher, ok := h.defaultDispatcher.(*dispatcher.DefaultDispatcher) + if !ok { + return errors.New("VLESS reverse must have a dispatcher that implemented routing.WrapLinkDispatcher") + } + return r.NewMux(ctx, defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) } - if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ + if err := dispatch.DispatchLink(ctx, request.Destination(), &transport.Link{ Reader: clientReader, Writer: clientWriter}, ); err != nil { From 96a55dff52a802df972d70fcd11a6625074e8e25 Mon Sep 17 00:00:00 2001 From: yuhan6665 <1588741+yuhan6665@users.noreply.github.com> Date: Wed, 24 Dec 2025 23:19:26 -0500 Subject: [PATCH 4/4] Remove dispatcher cast --- app/dispatcher/default.go | 13 +++++++------ proxy/vless/inbound/inbound.go | 9 ++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index 2ae649029625..03722e8a89a8 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -196,7 +196,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *tran return inboundLink, outboundLink } -func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link) *transport.Link { +func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link { sessionInbound := session.InboundFromContext(ctx) var user *protocol.MemoryUser if sessionInbound != nil { @@ -206,16 +206,16 @@ func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link) link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader} if user != nil && len(user.Email) > 0 { - p := d.policy.ForLevel(user.Level) + p := policyManager.ForLevel(user.Level) if p.Stats.UserUplink { name := "user>>>" + user.Email + ">>>traffic>>>uplink" - if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil { + if c, _ := stats.GetOrRegisterCounter(statsManager, name); c != nil { link.Reader.(*buf.TimeoutWrapperReader).Counter = c } } if p.Stats.UserDownlink { name := "user>>>" + user.Email + ">>>traffic>>>downlink" - if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil { + if c, _ := stats.GetOrRegisterCounter(statsManager, name); c != nil { link.Writer = &SizeStatWriter{ Counter: c, Writer: link.Writer, @@ -224,7 +224,7 @@ func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link) } if p.Stats.UserOnline { name := "user>>>" + user.Email + ">>>online" - if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil { + if om, _ := stats.GetOrRegisterOnlineMap(statsManager, name); om != nil { sessionInbounds := session.InboundFromContext(ctx) userIP := sessionInbounds.Source.Address.String() om.AddIP(userIP) @@ -357,7 +357,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De content = new(session.Content) ctx = session.ContextWithContent(ctx, content) } - outbound = d.WrapLink(ctx, outbound) + outbound = WrapLink(ctx, d.policy, d.stats, outbound) sniffingRequest := content.SniffingRequest if !sniffingRequest.Enabled { d.routedDispatch(ctx, outbound, destination) @@ -449,6 +449,7 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw } return contentResult, contentErr } + func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) { outbounds := session.OutboundsFromContext(ctx) ob := outbounds[len(outbounds)-1] diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 69439ffdeecc..d12495b407bd 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -32,6 +32,7 @@ import ( "github.com/xtls/xray-core/features/outbound" "github.com/xtls/xray-core/features/policy" "github.com/xtls/xray-core/features/routing" + "github.com/xtls/xray-core/features/stats" "github.com/xtls/xray-core/proxy" "github.com/xtls/xray-core/proxy/vless" "github.com/xtls/xray-core/proxy/vless/encoding" @@ -73,6 +74,7 @@ func init() { type Handler struct { inboundHandlerManager feature_inbound.Manager policyManager policy.Manager + stats stats.Manager validator vless.Validator decryption *encryption.ServerInstance outboundHandlerManager outbound.Manager @@ -88,6 +90,7 @@ func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Val handler := &Handler{ inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager), policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), + stats: v.GetFeature(stats.ManagerType()).(stats.Manager), validator: validator, outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager), defaultDispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher), @@ -620,11 +623,7 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s if err != nil { return err } - defaultDispatcher, ok := h.defaultDispatcher.(*dispatcher.DefaultDispatcher) - if !ok { - return errors.New("VLESS reverse must have a dispatcher that implemented routing.WrapLinkDispatcher") - } - return r.NewMux(ctx, defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) + return r.NewMux(ctx, dispatcher.WrapLink(ctx, h.policyManager, h.stats, &transport.Link{Reader: clientReader, Writer: clientWriter})) } if err := dispatch.DispatchLink(ctx, request.Destination(), &transport.Link{