Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions app/router/balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
)

type BalancingStrategy interface {
PickOutbound([]string) string
PickOutbound(outbound.Manager, []string) string
}

type RandomStrategy struct {
}

func (s *RandomStrategy) PickOutbound(tags []string) string {
// PickOutbound implement BalancingStrategy interface
func (s *RandomStrategy) PickOutbound(_ outbound.Manager, tags []string) string {
n := len(tags)
if n == 0 {
panic("0 tags")
Expand All @@ -36,7 +37,7 @@ func (b *Balancer) PickOutbound() (string, error) {
if len(tags) == 0 {
return "", newError("no available outbounds selected")
}
tag := b.strategy.PickOutbound(tags)
tag := b.strategy.PickOutbound(b.ohm, tags)
if tag == "" {
return "", newError("balancing strategy returns empty tag")
}
Expand Down
246 changes: 246 additions & 0 deletions app/router/balancing_optimal_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// +build !confonly

package router

import (
"bytes"
"context"
"fmt"
"math"
"net/http"
"net/url"
"sort"
"sync"
"time"

"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/net/cnc"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/pipe"

"github.com/xtls/xray-core/common/task"
"github.com/xtls/xray-core/features/outbound"
)

type TagWeight struct {
tag string
weight float64
}

// OptimalStrategy pick outbound by net speed
type OptimalStrategy struct {
timeout time.Duration
interval time.Duration
url *url.URL
count uint32
obm outbound.Manager
tag string
tags []string
weights map[string]uint32
periodic *task.Periodic
periodicMutex sync.Mutex
}

// NewOptimalStrategy create new strategy
func NewOptimalStrategy(config *BalancingOptimalStrategyConfig) *OptimalStrategy {
s := &OptimalStrategy{}
if config.Timeout == 0 {
s.timeout = time.Second * 5
} else {
s.timeout = time.Millisecond * time.Duration(config.Timeout)
}
if config.Interval == 0 {
s.interval = time.Second * 60 * 10
} else {
s.interval = time.Millisecond * time.Duration(config.Interval)
}
if config.Url == "" {
s.url, _ = url.Parse("https://www.google.com")
} else {
var err error
s.url, err = url.Parse(config.Url)
if err != nil {
panic(err)
}
if s.url.Scheme != "http" && s.url.Scheme != "https" {
panic("Only http/https url support")
}
}
if config.Count == 0 {
s.count = 1
} else {
s.count = config.Count
}
s.weights = make(map[string]uint32)
if config.Weights != nil {
for _, w := range config.Weights {
s.weights[w.Tag] = w.Weight
}
}
return s
}

// PickOutbound implement BalancingStrategy interface
func (s *OptimalStrategy) PickOutbound(obm outbound.Manager, tags []string) string {
if len(tags) == 0 {
panic("0 tags")
} else if len(tags) == 1 {
return s.tag
}

s.obm = obm
s.tags = tags

if s.periodic == nil {
s.periodicMutex.Lock()
if s.periodic == nil {
s.tag = s.tags[0]
s.periodic = &task.Periodic{
Interval: s.interval,
Execute: s.run,
}
go s.periodic.Start()
}
s.periodicMutex.Unlock()
}

return s.tag
}

type optimalStrategyTestResult struct {
tag string
score float64
}

// periodic execute function
func (s *OptimalStrategy) run() error {
tags := s.tags
count := s.count

results := make([]optimalStrategyTestResult, len(tags))

var wg sync.WaitGroup
wg.Add(len(tags))
for i, tag := range tags {
result := &results[i]
result.tag = tag
go s.testOutboud(tag, result, count, &wg)
}
wg.Wait()

sort.Slice(results, func(i, j int) bool {
// score scores in desc order
return results[i].score > results[j].score
})

if results[0].tag != s.tag {
newError(fmt.Sprintf("The balanced optimal strategy changes the outbound from [%s] to [%s] in %s", s.tag, results[0].tag, s.tags)).AtWarning().WriteToLog()
s.tag = results[0].tag
}
return nil
}

// Test outbound's network state with multi-round
func (s *OptimalStrategy) testOutboud(tag string, result *optimalStrategyTestResult, count uint32, wg *sync.WaitGroup) {
// test outbound by fetch url
defer wg.Done()
newError(fmt.Sprintf("s.obm.GetHandler %s", tag)).AtDebug().WriteToLog()
oh := s.obm.GetHandler(tag)
if oh == nil {
newError("Wrong OptimalStrategy tag").AtError().WriteToLog()
return
}

scores := make([]float64, count)
for i := uint32(0); i < count; i++ {
client := s.buildClient(oh)
// send http request though this outbound
req, _ := http.NewRequest("GET", s.url.String(), nil)
startAt := time.Now()
resp, err := client.Do(req)
// use http response speed or time(no http content) as score
score := 0.0
if err != nil {
newError(fmt.Sprintf("Balance OptimalStrategy tag %s error: %s", tag, err)).AtInfo().WriteToLog()
} else {
defer resp.Body.Close()
bodybuff := new(bytes.Buffer)
contentSize, err := bodybuff.ReadFrom(resp.Body)
if err != nil {
newError(fmt.Sprintf("Balance OptimalStrategy tag %s error: %s", tag, err)).AtInfo().WriteToLog()
} else {
finishAt := time.Now()
usetime := float64(finishAt.UnixNano()-startAt.UnixNano()) / float64(time.Second)
newError(fmt.Sprintf("Balance OptimalStrategy tag %s get contentSize: %d", tag, contentSize)).AtDebug().WriteToLog()
newError(fmt.Sprintf("Balance OptimalStrategy tag %s usetime: %f", tag, usetime)).AtDebug().WriteToLog()
var weight uint32 = 100.00
if _, ok := s.weights[tag]; ok {
weight = s.weights[tag]
}
if contentSize != 0 {
score = float64(weight) * float64(contentSize) / usetime
} else {
// assert http header's Byte size is 100B
score = float64(weight) * 100 / usetime
}
}
}
scores[i] = score
// next test round
client.CloseIdleConnections()
}

// calculate average score and end test round
var minScore float64 = float64(math.MaxInt64)
var maxScore float64 = float64(math.MinInt64)
var sumScore float64
var score float64

for _, score := range scores {
if score < minScore {
minScore = score
}
if score > maxScore {
maxScore = score
}
sumScore += score
}
if len(scores) < 3 {
score = sumScore / float64(len(scores))
} else {
score = (sumScore - minScore - maxScore) / float64(s.count-2)
}
newError(fmt.Sprintf("Balance OptimalStrategy get %s's score: %.2f", tag, score)).AtDebug().WriteToLog()
result.score = score
}

func (s *OptimalStrategy) buildClient(oh outbound.Handler) *http.Client {
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
netDestination, err := net.ParseDestination(fmt.Sprintf("%s:%s", network, addr))
if err != nil {
return nil, err
}

uplinkReader, uplinkWriter := pipe.New()
downlinkReader, downlinkWriter := pipe.New()
ctx = session.ContextWithOutbound(
ctx,
&session.Outbound{
Target: netDestination,
})
go oh.Dispatch(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter})

return cnc.NewConnection(cnc.ConnectionInputMulti(uplinkWriter), cnc.ConnectionOutputMulti(downlinkReader)), nil
},
MaxConnsPerHost: 1,
MaxIdleConns: 1,
DisableCompression: true,
DisableKeepAlives: true,
ForceAttemptHTTP2: true,
},
Timeout: s.timeout,
}
}
10 changes: 9 additions & 1 deletion app/router/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,17 @@ func (rr *RoutingRule) BuildCondition() (Condition, error) {
}

func (br *BalancingRule) Build(ohm outbound.Manager) (*Balancer, error) {
var strategy BalancingStrategy

if br.Strategy == "optimal" {
strategy = NewOptimalStrategy(br.OptimalStrategyConfig)
} else if br.Strategy == "" || br.Strategy == "random" {
strategy = &RandomStrategy{}
}

return &Balancer{
selectors: br.OutboundSelector,
strategy: &RandomStrategy{},
strategy: strategy,
ohm: ohm,
}, nil
}
Loading