Skip to content

Commit 1854db5

Browse files
authored
Reverse: portal-worker should not be closed before making sure there is at least one other active worker (XTLS#4869)
1 parent 2f5cb80 commit 1854db5

3 files changed

Lines changed: 21 additions & 9 deletions

File tree

app/reverse/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
func (c *Control) FillInRandom() {
1111
randomLength := dice.Roll(64)
12+
randomLength++
1213
c.Random = make([]byte, randomLength)
1314
io.ReadFull(rand.Reader, c.Random)
1415
}

app/reverse/portal.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
170170
if w.draining {
171171
continue
172172
}
173-
if w.client.Closed() {
173+
if w.IsFull() {
174174
continue
175175
}
176176
if w.client.ActiveConnections() < minConn {
@@ -211,6 +211,7 @@ type PortalWorker struct {
211211
writer buf.Writer
212212
reader buf.Reader
213213
draining bool
214+
counter uint32
214215
}
215216

216217
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
@@ -244,7 +245,7 @@ func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
244245
}
245246

246247
func (w *PortalWorker) heartbeat() error {
247-
if w.client.Closed() {
248+
if w.Closed() {
248249
return errors.New("client worker stopped")
249250
}
250251

@@ -260,16 +261,21 @@ func (w *PortalWorker) heartbeat() error {
260261
msg.State = Control_DRAIN
261262

262263
defer func() {
264+
w.client.GetTimer().Reset(time.Second * 16)
263265
common.Close(w.writer)
264266
common.Interrupt(w.reader)
265267
w.writer = nil
266268
}()
267269
}
268270

269-
b, err := proto.Marshal(msg)
270-
common.Must(err)
271-
mb := buf.MergeBytes(nil, b)
272-
return w.writer.WriteMultiBuffer(mb)
271+
w.counter = (w.counter + 1) % 5
272+
if w.draining || w.counter == 1 {
273+
b, err := proto.Marshal(msg)
274+
common.Must(err)
275+
mb := buf.MergeBytes(nil, b)
276+
return w.writer.WriteMultiBuffer(mb)
277+
}
278+
return nil
273279
}
274280

275281
func (w *PortalWorker) IsFull() bool {

common/mux/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ type ClientWorker struct {
173173
sessionManager *SessionManager
174174
link transport.Link
175175
done *done.Instance
176+
timer *time.Ticker
176177
strategy ClientStrategy
177178
}
178179

@@ -187,6 +188,7 @@ func NewClientWorker(stream transport.Link, s ClientStrategy) (*ClientWorker, er
187188
sessionManager: NewSessionManager(),
188189
link: stream,
189190
done: done.New(),
191+
timer: time.NewTicker(time.Second * 16),
190192
strategy: s,
191193
}
192194

@@ -209,9 +211,12 @@ func (m *ClientWorker) Closed() bool {
209211
return m.done.Done()
210212
}
211213

214+
func (m *ClientWorker) GetTimer() *time.Ticker {
215+
return m.timer
216+
}
217+
212218
func (m *ClientWorker) monitor() {
213-
timer := time.NewTicker(time.Second * 16)
214-
defer timer.Stop()
219+
defer m.timer.Stop()
215220

216221
for {
217222
select {
@@ -220,7 +225,7 @@ func (m *ClientWorker) monitor() {
220225
common.Close(m.link.Writer)
221226
common.Interrupt(m.link.Reader)
222227
return
223-
case <-timer.C:
228+
case <-m.timer.C:
224229
size := m.sessionManager.Size()
225230
if size == 0 && m.sessionManager.CloseIfNoSession() {
226231
common.Must(m.done.Close())

0 commit comments

Comments
 (0)