Skip to content

Commit 9fb7375

Browse files
authored
Refine code
1 parent 8b68241 commit 9fb7375

File tree

5 files changed

+89
-86
lines changed

5 files changed

+89
-86
lines changed

app/dispatcher/default.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
483483
handler = h
484484
} else {
485485
errors.LogWarning(ctx, "non existing outTag: ", outTag)
486+
return
486487
}
487488
} else {
488489
errors.LogInfo(ctx, "default route for ", destination)

app/reverse/bridge.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type BridgeWorker struct {
9797
Tag string
9898
Worker *mux.ServerWorker
9999
Dispatcher routing.Dispatcher
100-
state Control_State
100+
State Control_State
101101
}
102102

103103
func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) {
@@ -141,7 +141,7 @@ func (w *BridgeWorker) Close() error {
141141
}
142142

143143
func (w *BridgeWorker) IsActive() bool {
144-
return w.state == Control_ACTIVE && !w.Worker.Closed()
144+
return w.State == Control_ACTIVE && !w.Worker.Closed()
145145
}
146146

147147
func (w *BridgeWorker) Connections() uint32 {
@@ -161,8 +161,8 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
161161
errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
162162
break
163163
}
164-
if ctl.State != w.state {
165-
w.state = ctl.State
164+
if ctl.State != w.State {
165+
w.State = ctl.State
166166
}
167167
}
168168
}

app/reverse/reverse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
)
1313

1414
const (
15-
internalDomain = "reverse.internal.v2fly.org" // make reverse proxy compatible with v2fly
15+
internalDomain = "reverse"
1616
)
1717

1818
func isDomain(dest net.Destination, domain string) bool {

proxy/vless/inbound/inbound.go

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"time"
1313
"unsafe"
1414

15-
app_dispatcher "github.com/xtls/xray-core/app/dispatcher"
15+
"github.com/xtls/xray-core/app/dispatcher"
1616
"github.com/xtls/xray-core/app/reverse"
1717
"github.com/xtls/xray-core/common"
1818
"github.com/xtls/xray-core/common/buf"
@@ -71,23 +71,27 @@ func init() {
7171

7272
// Handler is an inbound connection handler that handles messages in VLess protocol.
7373
type Handler struct {
74-
inboundHandlerManager feature_inbound.Manager
75-
policyManager policy.Manager
76-
validator vless.Validator
77-
dns dns.Client
78-
decryption *encryption.ServerInstance
79-
fallbacks map[string]map[string]map[string]*Fallback // or nil
74+
inboundHandlerManager feature_inbound.Manager
75+
policyManager policy.Manager
76+
validator vless.Validator
77+
decryption *encryption.ServerInstance
78+
outboundHandlerManager outbound.Manager
79+
defaultDispatcher *dispatcher.DefaultDispatcher
80+
ctx context.Context
81+
fallbacks map[string]map[string]map[string]*Fallback // or nil
8082
// regexps map[string]*regexp.Regexp // or nil
8183
}
8284

8385
// New creates a new VLess inbound handler.
8486
func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Validator) (*Handler, error) {
8587
v := core.MustFromContext(ctx)
8688
handler := &Handler{
87-
inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager),
88-
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
89-
dns: dc,
90-
validator: validator,
89+
inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager),
90+
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
91+
validator: validator,
92+
outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
93+
defaultDispatcher: v.GetFeature(routing.DispatcherType()).(*dispatcher.DefaultDispatcher),
94+
ctx: ctx,
9195
}
9296

9397
if config.Decryption != "" && config.Decryption != "none" {
@@ -179,12 +183,46 @@ func isMuxAndNotXUDP(request *protocol.RequestHeader, first *buf.Buffer) bool {
179183
firstBytes[6] == 2) // Network type: UDP
180184
}
181185

186+
func (h *Handler) GetReverse(a *vless.MemoryAccount) (*Reverse, error) {
187+
u := h.validator.Get(a.ID.UUID())
188+
if u == nil {
189+
return nil, errors.New("reverse: user " + a.ID.String() + " doesn't exist anymore")
190+
}
191+
a = u.Account.(*vless.MemoryAccount)
192+
if a.Reverse == nil || a.Reverse.Tag == "" {
193+
return nil, errors.New("reverse: user " + a.ID.String() + " is not allowed to create reverse proxy")
194+
}
195+
r := h.outboundHandlerManager.GetHandler(a.Reverse.Tag)
196+
if r == nil {
197+
picker, _ := reverse.NewStaticMuxPicker()
198+
r = &Reverse{tag: a.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}}
199+
if err := h.outboundHandlerManager.AddHandler(h.ctx, r); err != nil {
200+
return nil, err
201+
}
202+
}
203+
if r, ok := r.(*Reverse); ok {
204+
return r, nil
205+
}
206+
return nil, errors.New("reverse: outbound " + a.Reverse.Tag + " is not type Reverse")
207+
}
208+
209+
func (h *Handler) RemoveReverse(u *protocol.MemoryUser) {
210+
if u != nil {
211+
a := u.Account.(*vless.MemoryAccount)
212+
if a.Reverse != nil && a.Reverse.Tag != "" {
213+
h.outboundHandlerManager.RemoveHandler(h.ctx, a.Reverse.Tag)
214+
}
215+
}
216+
}
217+
182218
// Close implements common.Closable.Close().
183219
func (h *Handler) Close() error {
184220
if h.decryption != nil {
185221
h.decryption.Close()
186222
}
187-
// TODO: remove reverse's handlers (needs ctx)
223+
for _, u := range h.validator.GetAll() {
224+
h.RemoveReverse(u)
225+
}
188226
return errors.Combine(common.Close(h.validator))
189227
}
190228

@@ -195,19 +233,7 @@ func (h *Handler) AddUser(ctx context.Context, u *protocol.MemoryUser) error {
195233

196234
// RemoveUser implements proxy.UserManager.RemoveUser().
197235
func (h *Handler) RemoveUser(ctx context.Context, e string) error {
198-
u := h.validator.GetByEmail(e)
199-
if u != nil {
200-
a := u.Account.(*vless.MemoryAccount)
201-
if a.Reverse != nil && a.Reverse.Tag != "" {
202-
core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { // not sure whether it works or not
203-
go func() {
204-
time.Sleep(time.Minute) // TODO: check firstLen
205-
om.RemoveHandler(ctx, a.Reverse.Tag)
206-
}()
207-
return nil
208-
})
209-
}
210-
}
236+
h.RemoveReverse(h.validator.GetByEmail(e))
211237
return h.validator.Del(e)
212238
}
213239

@@ -519,6 +545,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
519545
switch request.Command {
520546
case protocol.RequestCommandUDP:
521547
return errors.New(requestAddons.Flow + " doesn't support UDP").AtWarning()
548+
case protocol.RequestCommandRvs:
549+
inbound.CanSpliceCopy = 3
522550
case protocol.RequestCommandMux:
523551
fallthrough // we will break Mux connections that contain TCP requests
524552
case protocol.RequestCommandTCP:
@@ -585,30 +613,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
585613
bufferWriter.SetFlushNext()
586614

587615
if request.Command == protocol.RequestCommandRvs {
588-
if account.Reverse == nil || account.Reverse.Tag == "" {
589-
return errors.New("account " + account.ID.String() + " can not use reverse proxy")
590-
}
591-
var rd routing.Dispatcher
592-
var obm outbound.Manager
593-
if err := core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error {
594-
rd = d
595-
obm = om
596-
return nil
597-
}); err != nil {
616+
r, err := h.GetReverse(account)
617+
if err != nil {
598618
return err
599619
}
600-
r := obm.GetHandler(account.Reverse.Tag)
601-
if r == nil {
602-
picker, _ := reverse.NewStaticMuxPicker()
603-
r = &Reverse{tag: account.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}}
604-
if err := obm.AddHandler(ctx, r); err != nil {
605-
return err
606-
}
607-
}
608-
if r, ok := r.(*Reverse); ok {
609-
return r.NewMux(ctx, rd.(*app_dispatcher.DefaultDispatcher).WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter}))
610-
}
611-
return errors.New("mismatched reverse tag")
620+
return r.NewMux(ctx, h.defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter}))
612621
}
613622

614623
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
@@ -630,14 +639,14 @@ func (r *Reverse) Tag() string {
630639
return r.tag
631640
}
632641

633-
func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error {
642+
func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { // XTLS? vnext? users?
634643
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
635644
if err != nil {
636645
return errors.New("failed to create mux client worker").Base(err).AtWarning()
637646
}
638647
worker, err := reverse.NewPortalWorker(muxClient)
639648
if err != nil {
640-
return errors.New("failed to create portal worker").Base(err)
649+
return errors.New("failed to create portal worker").Base(err).AtWarning()
641650
}
642651
r.picker.AddWorker(worker)
643652
select {

proxy/vless/outbound/outbound.go

Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -89,28 +89,20 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
8989
}
9090

9191
if a.Reverse != nil {
92-
if err := core.RequireFeatures(ctx, func(d routing.Dispatcher) error {
93-
ctx = session.ContextWithInbound(ctx, &session.Inbound{
94-
Tag: a.Reverse.Tag,
95-
})
96-
ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{
97-
Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")},
98-
}})
99-
handler.reverse = &Reverse{
100-
tag: a.Reverse.Tag,
101-
dispatcher: d,
102-
ctx: ctx,
103-
handler: handler,
104-
}
105-
handler.reverse.monitorTask = &task.Periodic{
106-
Execute: handler.reverse.monitor,
107-
Interval: time.Second * 2,
108-
}
109-
handler.reverse.Start()
110-
return nil
111-
}); err != nil {
112-
return nil, err
92+
handler.reverse = &Reverse{
93+
tag: a.Reverse.Tag,
94+
dispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
95+
ctx: ctx,
96+
handler: handler,
11397
}
98+
handler.reverse.monitorTask = &task.Periodic{
99+
Execute: handler.reverse.monitor,
100+
Interval: time.Second * 2,
101+
}
102+
go func() {
103+
time.Sleep(2 * time.Second)
104+
handler.reverse.Start()
105+
}()
114106
}
115107

116108
return handler, nil
@@ -128,7 +120,7 @@ func (h *Handler) Close() error {
128120
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
129121
outbounds := session.OutboundsFromContext(ctx)
130122
ob := outbounds[len(outbounds)-1]
131-
if !ob.Target.IsValid() {
123+
if !ob.Target.IsValid() && ob.Target.Address.String() != "v1.rvs.cool" {
132124
return errors.New("target not specified").AtError()
133125
}
134126
ob.Name = "vless"
@@ -400,26 +392,27 @@ func (r *Reverse) monitor() error {
400392
if numWorker == 0 || numConnections/numWorker > 16 {
401393
reader1, writer1 := pipe.New(pipe.WithSizeLimit(2 * buf.Size))
402394
reader2, writer2 := pipe.New(pipe.WithSizeLimit(2 * buf.Size))
403-
link1 := &transport.Link{
404-
Reader: reader1,
405-
Writer: writer2,
406-
}
407-
link2 := &transport.Link{
408-
Reader: reader2,
409-
Writer: writer1,
410-
}
395+
link1 := &transport.Link{Reader: reader1, Writer: writer2}
396+
link2 := &transport.Link{Reader: reader2, Writer: writer1}
411397
w := &reverse.BridgeWorker{
412398
Tag: r.tag,
413399
Dispatcher: r.dispatcher,
414400
}
415401
worker, err := mux.NewServerWorker(r.ctx, w, link1)
416402
if err != nil {
417-
errors.LogWarningInner(context.Background(), err, "failed to create bridge worker")
403+
errors.LogWarningInner(r.ctx, err, "failed to create mux server worker")
418404
return nil
419405
}
420406
w.Worker = worker
421407
r.workers = append(r.workers, w)
422-
go r.handler.Process(r.ctx, link2, session.HandlerFromContext(r.ctx).(*proxyman.Handler))
408+
go func() {
409+
ctx := session.ContextWithOutbounds(r.ctx, []*session.Outbound{{
410+
Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")},
411+
}})
412+
r.handler.Process(ctx, link2, session.HandlerFromContext(ctx).(*proxyman.Handler))
413+
common.Interrupt(reader1)
414+
common.Interrupt(reader2)
415+
}()
423416
}
424417
return nil
425418
}

0 commit comments

Comments
 (0)