66 "runtime"
77 "strings"
88 "sync"
9- "sync/atomic"
109 "time"
1110)
1211
@@ -22,57 +21,29 @@ type workerPool struct {
2221
2322 // Function for serving server connections.
2423 // It must leave c unclosed.
25- ready workerChanStack
2624 WorkerFunc ServeHandler
2725
2826 stopCh chan struct {}
2927
3028 connState func (net.Conn , ConnState )
3129
30+ ready []* workerChan
31+
3232 MaxWorkersCount int
3333
3434 MaxIdleWorkerDuration time.Duration
3535
36- workersCount int32
36+ workersCount int
3737
38- mustStop atomic. Bool
38+ lock sync. Mutex
3939
4040 LogAllErrors bool
41+ mustStop bool
4142}
4243
4344type workerChan struct {
44- next * workerChan
45-
46- ch chan net.Conn
47-
48- lastUseTime int64
49- }
50-
51- type workerChanStack struct {
52- head atomic.Pointer [workerChan ]
53- }
54-
55- func (s * workerChanStack ) push (ch * workerChan ) {
56- for {
57- oldHead := s .head .Load ()
58- ch .next = oldHead
59- if s .head .CompareAndSwap (oldHead , ch ) {
60- break
61- }
62- }
63- }
64-
65- func (s * workerChanStack ) pop () * workerChan {
66- for {
67- oldHead := s .head .Load ()
68- if oldHead == nil {
69- return nil
70- }
71-
72- if s .head .CompareAndSwap (oldHead , oldHead .next ) {
73- return oldHead
74- }
75- }
45+ lastUseTime time.Time
46+ ch chan net.Conn
7647}
7748
7849func (wp * workerPool ) Start () {
@@ -87,8 +58,9 @@ func (wp *workerPool) Start() {
8758 }
8859 }
8960 go func () {
61+ var scratch []* workerChan
9062 for {
91- wp .clean ()
63+ wp .clean (& scratch )
9264 select {
9365 case <- stopCh :
9466 return
@@ -109,15 +81,15 @@ func (wp *workerPool) Stop() {
10981 // Stop all the workers waiting for incoming connections.
11082 // Do not wait for busy workers - they will stop after
11183 // serving the connection and noticing wp.mustStop = true.
112-
113- for {
114- ch := wp .ready .pop ()
115- if ch == nil {
116- break
117- }
118- ch .ch <- nil
84+ wp .lock .Lock ()
85+ ready := wp .ready
86+ for i := range ready {
87+ ready [i ].ch <- nil
88+ ready [i ] = nil
11989 }
120- wp .mustStop .Store (true )
90+ wp .ready = ready [:0 ]
91+ wp .mustStop = true
92+ wp .lock .Unlock ()
12193}
12294
12395func (wp * workerPool ) getMaxIdleWorkerDuration () time.Duration {
@@ -127,22 +99,50 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
12799 return wp .MaxIdleWorkerDuration
128100}
129101
130- func (wp * workerPool ) clean () {
102+ func (wp * workerPool ) clean (scratch * [] * workerChan ) {
131103 maxIdleWorkerDuration := wp .getMaxIdleWorkerDuration ()
132- criticalTime := time .Now ().Add (- maxIdleWorkerDuration ).UnixNano ()
133104
134- for {
135- current := wp .ready .head .Load ()
136- if current == nil || atomic .LoadInt64 (& current .lastUseTime ) >= criticalTime {
137- break
138- }
105+ // Clean least recently used workers if they didn't serve connections
106+ // for more than maxIdleWorkerDuration.
107+ criticalTime := time .Now ().Add (- maxIdleWorkerDuration )
108+
109+ wp .lock .Lock ()
110+ ready := wp .ready
111+ n := len (ready )
139112
140- next := current .next
141- if wp .ready .head .CompareAndSwap (current , next ) {
142- current .ch <- nil
143- wp .workerChanPool .Put (current )
113+ // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
114+ l , r := 0 , n - 1
115+ for l <= r {
116+ mid := (l + r ) / 2
117+ if criticalTime .After (wp .ready [mid ].lastUseTime ) {
118+ l = mid + 1
119+ } else {
120+ r = mid - 1
144121 }
145122 }
123+ i := r
124+ if i == - 1 {
125+ wp .lock .Unlock ()
126+ return
127+ }
128+
129+ * scratch = append ((* scratch )[:0 ], ready [:i + 1 ]... )
130+ m := copy (ready , ready [i + 1 :])
131+ for i = m ; i < n ; i ++ {
132+ ready [i ] = nil
133+ }
134+ wp .ready = ready [:m ]
135+ wp .lock .Unlock ()
136+
137+ // Notify obsolete workers to stop.
138+ // This notification must be outside the wp.lock, since ch.ch
139+ // may be blocking and may consume a lot of time if many workers
140+ // are located on non-local CPUs.
141+ tmp := * scratch
142+ for i := range tmp {
143+ tmp [i ].ch <- nil
144+ tmp [i ] = nil
145+ }
146146}
147147
148148func (wp * workerPool ) Serve (c net.Conn ) bool {
@@ -169,32 +169,47 @@ var workerChanCap = func() int {
169169}()
170170
171171func (wp * workerPool ) getCh () * workerChan {
172- for {
173- ch := wp .ready .pop ()
174- if ch != nil {
175- return ch
172+ var ch * workerChan
173+ createWorker := false
174+
175+ wp .lock .Lock ()
176+ ready := wp .ready
177+ n := len (ready ) - 1
178+ if n < 0 {
179+ if wp .workersCount < wp .MaxWorkersCount {
180+ createWorker = true
181+ wp .workersCount ++
176182 }
183+ } else {
184+ ch = ready [n ]
185+ ready [n ] = nil
186+ wp .ready = ready [:n ]
187+ }
188+ wp .lock .Unlock ()
177189
178- currentWorkers := atomic .LoadInt32 (& wp .workersCount )
179- if int (currentWorkers ) < wp .MaxWorkersCount {
180- if atomic .CompareAndSwapInt32 (& wp .workersCount , currentWorkers , currentWorkers + 1 ) {
181- ch = wp .workerChanPool .Get ().(* workerChan )
182- go wp .workerFunc (ch )
183- return ch
184- }
185- } else {
186- break
190+ if ch == nil {
191+ if ! createWorker {
192+ return nil
187193 }
194+ vch := wp .workerChanPool .Get ()
195+ ch = vch .(* workerChan )
196+ go func () {
197+ wp .workerFunc (ch )
198+ wp .workerChanPool .Put (vch )
199+ }()
188200 }
189- return nil
201+ return ch
190202}
191203
192204func (wp * workerPool ) release (ch * workerChan ) bool {
193- atomic .StoreInt64 (& ch .lastUseTime , time .Now ().UnixNano ())
194- if wp .mustStop .Load () {
205+ ch .lastUseTime = time .Now ()
206+ wp .lock .Lock ()
207+ if wp .mustStop {
208+ wp .lock .Unlock ()
195209 return false
196210 }
197- wp .ready .push (ch )
211+ wp .ready = append (wp .ready , ch )
212+ wp .lock .Unlock ()
198213 return true
199214}
200215
@@ -230,5 +245,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
230245 }
231246 }
232247
233- atomic .AddInt32 (& wp .workersCount , - 1 )
248+ wp .lock .Lock ()
249+ wp .workersCount --
250+ wp .lock .Unlock ()
234251}
0 commit comments