Skip to content

Commit e1a269d

Browse files
committed
cmd/sync: add global traffic control for sync
1 parent aced471 commit e1a269d

File tree

3 files changed

+222
-81
lines changed

3 files changed

+222
-81
lines changed

cmd/sync.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ func syncStorageFlags() []cli.Flag {
265265
Name: "bwlimit",
266266
Usage: "limit bandwidth in Mbps (0 means unlimited)",
267267
},
268+
&cli.StringFlag{
269+
Name: "traffic-control-addr",
270+
Usage: "the address of the traffic control",
271+
},
268272
})
269273
}
270274

pkg/sync/config.go

Lines changed: 81 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -30,45 +30,46 @@ import (
3030
)
3131

3232
type Config struct {
33-
StorageClass string
34-
Start string
35-
End string
36-
Threads int
37-
Update bool
38-
ForceUpdate bool
39-
Perms bool
40-
MaxFailure int64
41-
Dry bool
42-
DeleteSrc bool
43-
DeleteDst bool
44-
MatchFullPath bool
45-
Dirs bool
46-
Exclude []string
47-
Include []string
48-
Existing bool
49-
IgnoreExisting bool
50-
Links bool
51-
Inplace bool
52-
Limit int64
53-
Manager string
54-
Workers []string
55-
ManagerAddr string
56-
ListThreads int
57-
ListDepth int
58-
BWLimit int64
59-
NoHTTPS bool
60-
Verbose bool
61-
Quiet bool
62-
CheckAll bool
63-
CheckNew bool
64-
CheckChange bool
65-
MaxSize int64
66-
MinSize int64
67-
MaxAge time.Duration
68-
MinAge time.Duration
69-
StartTime time.Time
70-
EndTime time.Time
71-
Env map[string]string
33+
StorageClass string
34+
Start string
35+
End string
36+
Threads int
37+
Update bool
38+
ForceUpdate bool
39+
Perms bool
40+
MaxFailure int64
41+
Dry bool
42+
DeleteSrc bool
43+
DeleteDst bool
44+
MatchFullPath bool
45+
Dirs bool
46+
Exclude []string
47+
Include []string
48+
Existing bool
49+
IgnoreExisting bool
50+
Links bool
51+
Inplace bool
52+
Limit int64
53+
Manager string
54+
Workers []string
55+
ManagerAddr string
56+
ListThreads int
57+
ListDepth int
58+
BWLimit int64
59+
TrafficControlAddr string
60+
NoHTTPS bool
61+
Verbose bool
62+
Quiet bool
63+
CheckAll bool
64+
CheckNew bool
65+
CheckChange bool
66+
MaxSize int64
67+
MinSize int64
68+
MaxAge time.Duration
69+
MinAge time.Duration
70+
StartTime time.Time
71+
EndTime time.Time
72+
Env map[string]string
7273

7374
FilesFrom string
7475

@@ -171,46 +172,47 @@ func NewConfigFromCli(c *cli.Context) *Config {
171172
}
172173
}
173174
cfg := &Config{
174-
StorageClass: c.String("storage-class"),
175-
Start: c.String("start"),
176-
End: c.String("end"),
177-
Threads: c.Int("threads"),
178-
ListThreads: c.Int("list-threads"),
179-
ListDepth: c.Int("list-depth"),
180-
Update: c.Bool("update"),
181-
ForceUpdate: c.Bool("force-update"),
182-
Perms: c.Bool("perms"),
183-
Dirs: c.Bool("dirs"),
184-
Dry: c.Bool("dry"),
185-
MaxFailure: c.Int64("max-failure"),
186-
DeleteSrc: c.Bool("delete-src"),
187-
DeleteDst: c.Bool("delete-dst"),
188-
Exclude: c.StringSlice("exclude"),
189-
Include: c.StringSlice("include"),
190-
MatchFullPath: c.Bool("match-full-path"),
191-
Existing: c.Bool("existing"),
192-
IgnoreExisting: c.Bool("ignore-existing"),
193-
Links: c.Bool("links"),
194-
Inplace: c.Bool("inplace"),
195-
Limit: c.Int64("limit"),
196-
Workers: c.StringSlice("worker"),
197-
ManagerAddr: c.String("manager-addr"),
198-
Manager: c.String("manager"),
199-
BWLimit: utils.ParseMbps(c, "bwlimit"),
200-
NoHTTPS: c.Bool("no-https"),
201-
Verbose: c.Bool("verbose"),
202-
Quiet: c.Bool("quiet"),
203-
CheckAll: c.Bool("check-all"),
204-
CheckNew: c.Bool("check-new"),
205-
CheckChange: c.Bool("check-change"),
206-
MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')),
207-
MinSize: int64(utils.ParseBytes(c, "min-size", 'B')),
208-
MaxAge: utils.Duration(c.String("max-age")),
209-
MinAge: utils.Duration(c.String("min-age")),
210-
StartTime: startTime,
211-
EndTime: endTime,
212-
FilesFrom: c.String("files-from"),
213-
Env: make(map[string]string),
175+
StorageClass: c.String("storage-class"),
176+
Start: c.String("start"),
177+
End: c.String("end"),
178+
Threads: c.Int("threads"),
179+
ListThreads: c.Int("list-threads"),
180+
ListDepth: c.Int("list-depth"),
181+
Update: c.Bool("update"),
182+
ForceUpdate: c.Bool("force-update"),
183+
Perms: c.Bool("perms"),
184+
Dirs: c.Bool("dirs"),
185+
Dry: c.Bool("dry"),
186+
MaxFailure: c.Int64("max-failure"),
187+
DeleteSrc: c.Bool("delete-src"),
188+
DeleteDst: c.Bool("delete-dst"),
189+
Exclude: c.StringSlice("exclude"),
190+
Include: c.StringSlice("include"),
191+
MatchFullPath: c.Bool("match-full-path"),
192+
Existing: c.Bool("existing"),
193+
IgnoreExisting: c.Bool("ignore-existing"),
194+
Links: c.Bool("links"),
195+
Inplace: c.Bool("inplace"),
196+
Limit: c.Int64("limit"),
197+
Workers: c.StringSlice("worker"),
198+
ManagerAddr: c.String("manager-addr"),
199+
Manager: c.String("manager"),
200+
BWLimit: utils.ParseMbps(c, "bwlimit"),
201+
TrafficControlAddr: c.String("traffic-control-addr"),
202+
NoHTTPS: c.Bool("no-https"),
203+
Verbose: c.Bool("verbose"),
204+
Quiet: c.Bool("quiet"),
205+
CheckAll: c.Bool("check-all"),
206+
CheckNew: c.Bool("check-new"),
207+
CheckChange: c.Bool("check-change"),
208+
MaxSize: int64(utils.ParseBytes(c, "max-size", 'B')),
209+
MinSize: int64(utils.ParseBytes(c, "min-size", 'B')),
210+
MaxAge: utils.Duration(c.String("max-age")),
211+
MinAge: utils.Duration(c.String("min-age")),
212+
StartTime: startTime,
213+
EndTime: endTime,
214+
FilesFrom: c.String("files-from"),
215+
Env: make(map[string]string),
214216
}
215217
if !c.IsSet("max-size") {
216218
cfg.MaxSize = math.MaxInt64

pkg/sync/sync.go

Lines changed: 137 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
"bufio"
2121
"bytes"
2222
"context"
23+
"encoding/json"
2324
"errors"
2425
"fmt"
2526
"hash/crc32"
2627
"io"
2728
"math"
29+
"net/http"
2830
"os"
2931
"path"
3032
"runtime"
@@ -66,9 +68,125 @@ var (
6668
deleted, failed *utils.Bar
6769
listedPrefix *utils.Bar
6870
concurrent chan int
69-
limiter *ratelimit.Bucket
71+
limiter *mixedLimiter
7072
totalHandled atomic.Int64
7173
)
74+
75+
type mixedLimiter struct {
76+
global *globalLimit
77+
local *ratelimit.Bucket
78+
}
79+
80+
func (l *mixedLimiter) Wait(count int64) {
81+
if l.local != nil {
82+
l.local.Wait(count)
83+
}
84+
if l.global != nil {
85+
l.global.wait(count)
86+
}
87+
}
88+
89+
type globalLimit struct {
90+
sync.Mutex
91+
balance int64
92+
due time.Time
93+
need int64
94+
waiters []*sync.Cond
95+
96+
address string
97+
}
98+
type req struct {
99+
// Positive numbers indicate a request, negative numbers indicate a payback.
100+
Bytes int64 `json:"bytes"`
101+
}
102+
103+
type resp struct {
104+
Granted int64 `json:"granted"` // bytes
105+
Expired int64 `json:"expired"` // Millisecond
106+
}
107+
108+
func (l *globalLimit) request(ask int64) (int64, int64, error) {
109+
r := req{Bytes: ask}
110+
data, err := json.Marshal(r)
111+
if err != nil {
112+
return 0, 0, err
113+
}
114+
result, err := http.Post(l.address, "application/json", bytes.NewReader(data))
115+
if err != nil || result.StatusCode != http.StatusOK {
116+
var status string
117+
if result != nil {
118+
status = http.StatusText(result.StatusCode)
119+
}
120+
logger.Errorf("request traffic control %s failed: %s, http status: %s", l.address, err, status)
121+
return 0, 0, err
122+
}
123+
content, err := io.ReadAll(result.Body)
124+
if err != nil {
125+
return 0, 0, err
126+
}
127+
res := resp{}
128+
if err := json.Unmarshal(content, &res); err != nil {
129+
return 0, 0, err
130+
}
131+
return res.Granted, res.Expired, nil
132+
}
133+
134+
func (l *globalLimit) wait(bytes int64) {
135+
l.Lock()
136+
defer l.Unlock()
137+
if bytes <= 0 || l.balance >= bytes && len(l.waiters) == 0 {
138+
l.balance -= bytes
139+
return
140+
}
141+
l.need += bytes
142+
143+
var me = sync.NewCond(l)
144+
l.waiters = append(l.waiters, me)
145+
for l.waiters[0] != me {
146+
me.Wait()
147+
}
148+
149+
if l.balance < bytes {
150+
// request credit for other waiters together
151+
ask := l.need - l.balance
152+
if ask >= bytes*10 {
153+
// don't wait for too long
154+
ask = bytes * 10
155+
}
156+
l.Unlock()
157+
granted, expire, err := l.request(ask)
158+
l.Lock()
159+
if err == nil {
160+
l.balance += granted
161+
l.due = time.Now().Add(time.Millisecond * time.Duration(expire))
162+
logger.Debugf("grant %d from %s until %s", granted, l.address, l.due)
163+
}
164+
}
165+
166+
l.balance -= bytes
167+
l.need -= bytes
168+
l.waiters = l.waiters[1:]
169+
if len(l.waiters) > 0 {
170+
l.waiters[0].Signal()
171+
}
172+
}
173+
174+
func (l *globalLimit) checkBalance() {
175+
now := time.Now()
176+
l.Lock()
177+
if l.balance > 0 && l.need == 0 && l.due.Before(now) {
178+
payback := l.balance
179+
if payback > 1<<30 {
180+
payback = 1 << 30
181+
}
182+
l.balance -= payback
183+
l.Unlock()
184+
_, _, _ = l.request(-payback)
185+
} else {
186+
l.Unlock()
187+
}
188+
}
189+
72190
var crcTable = crc32.MakeTable(crc32.Castagnoli)
73191
var logger = utils.GetLogger("juicefs")
74192
var ctx = context.Background()
@@ -1711,9 +1829,26 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
17111829
tasks := make(chan object.Object, bufferSize)
17121830
wg := sync.WaitGroup{}
17131831
concurrent = make(chan int, config.Threads)
1832+
var localLimit *ratelimit.Bucket
17141833
if config.BWLimit > 0 {
17151834
bps := float64(config.BWLimit*1e6/8) * 0.85 // 15% overhead
1716-
limiter = ratelimit.NewBucketWithRate(bps, int64(bps)/10)
1835+
localLimit = ratelimit.NewBucketWithRate(bps, int64(bps)/10)
1836+
}
1837+
var gLimit *globalLimit
1838+
if config.TrafficControlAddr != "" {
1839+
gLimit = &globalLimit{address: config.TrafficControlAddr}
1840+
go func() {
1841+
for {
1842+
time.Sleep(time.Millisecond * 10)
1843+
gLimit.checkBalance()
1844+
}
1845+
}()
1846+
}
1847+
if localLimit != nil || gLimit != nil {
1848+
limiter = &mixedLimiter{
1849+
global: gLimit,
1850+
local: localLimit,
1851+
}
17171852
}
17181853

17191854
progress := utils.NewProgress(config.Verbose || config.Quiet || config.Manager != "")

0 commit comments

Comments
 (0)