Skip to content

Commit 6bfce87

Browse files
committed
replace SSE streams with single unified one
1 parent 749731d commit 6bfce87

File tree

7 files changed

+181
-193
lines changed

7 files changed

+181
-193
lines changed

proxy/logMonitor_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,38 @@ package proxy
33
import (
44
"bytes"
55
"io"
6+
"sync"
67
"testing"
78
)
89

910
func TestLogMonitor(t *testing.T) {
1011
logMonitor := NewLogMonitorWriter(io.Discard)
1112

13+
// A WaitGroup is used to wait for all the expected writes to complete
14+
var wg sync.WaitGroup
15+
1216
client1Messages := make([]byte, 0)
1317
client2Messages := make([]byte, 0)
1418

1519
defer logMonitor.OnLogData(func(data []byte) {
1620
client1Messages = append(client1Messages, data...)
21+
wg.Done()
1722
})()
1823

1924
defer logMonitor.OnLogData(func(data []byte) {
2025
client2Messages = append(client2Messages, data...)
26+
wg.Done()
2127
})()
2228

29+
wg.Add(6) // 2 x 3 writes
30+
2331
logMonitor.Write([]byte("1"))
2432
logMonitor.Write([]byte("2"))
2533
logMonitor.Write([]byte("3"))
2634

35+
// wait for all writes to complete
36+
wg.Wait()
37+
2738
// Check the buffer
2839
expectedHistory := "123"
2940
history := string(logMonitor.GetHistory())

proxy/proxymanager.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,7 @@ func (pm *ProxyManager) setupGinEngine() {
167167
// in proxymanager_loghandlers.go
168168
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
169169
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
170-
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
171170
pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler)
172-
pm.ginEngine.GET("/logs/streamSSE/:logMonitorID", pm.streamLogsHandlerSSE)
173171

174172
/**
175173
* User Interface Endpoints

proxy/proxymanager_api.go

Lines changed: 77 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package proxy
22

33
import (
4+
"context"
5+
"encoding/json"
46
"net/http"
57
"sort"
68

@@ -17,9 +19,8 @@ func addApiHandlers(pm *ProxyManager) {
1719
// Add API endpoints for React to consume
1820
apiGroup := pm.ginEngine.Group("/api")
1921
{
20-
apiGroup.GET("/models", pm.apiListModels)
21-
apiGroup.GET("/modelsSSE", pm.apiListModelsSSE)
2222
apiGroup.POST("/models/unload", pm.apiUnloadAllModels)
23+
apiGroup.GET("/events", pm.apiSendEvents)
2324
}
2425
}
2526

@@ -73,42 +74,92 @@ func (pm *ProxyManager) getModelStatus() []Model {
7374
return models
7475
}
7576

76-
func (pm *ProxyManager) apiListModels(c *gin.Context) {
77-
c.JSON(http.StatusOK, pm.getModelStatus())
77+
type messageType string
78+
79+
const (
80+
msgTypeModelStatus messageType = "modelStatus"
81+
msgTypeLogData messageType = "logData"
82+
)
83+
84+
type messageEnvelope struct {
85+
Type messageType `json:"type"`
86+
Data string `json:"data"`
7887
}
7988

80-
// stream the models as a SSE
81-
func (pm *ProxyManager) apiListModelsSSE(c *gin.Context) {
89+
// sends a stream of different message types that happen on the server
90+
func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
8291
c.Header("Content-Type", "text/event-stream")
8392
c.Header("Cache-Control", "no-cache")
8493
c.Header("Connection", "keep-alive")
8594
c.Header("X-Content-Type-Options", "nosniff")
8695

87-
// flush the first one
88-
c.SSEvent("message", pm.getModelStatus())
89-
c.Writer.Flush()
96+
sendBuffer := make(chan messageEnvelope, 25)
97+
ctx, cancel := context.WithCancel(c.Request.Context())
98+
sendModels := func() {
99+
data, err := json.Marshal(pm.getModelStatus())
100+
if err == nil {
101+
msg := messageEnvelope{Type: msgTypeModelStatus, Data: string(data)}
102+
select {
103+
case sendBuffer <- msg:
104+
case <-ctx.Done():
105+
return
106+
default:
107+
}
90108

91-
// send whenever the any process state
92-
defer event.On(func(e ProcessStateChangeEvent) {
93-
if c != nil && c.Writer != nil {
94-
models := pm.getModelStatus()
95-
c.SSEvent("message", models)
96-
c.Writer.Flush()
97109
}
98-
})()
110+
}
99111

100-
// resend the models when the config is reloaded
101-
defer event.On(func(e ConfigFileChangedEvent) {
102-
if c != nil && c.Writer != nil && e.ReloadingState == "end" {
103-
models := pm.getModelStatus()
104-
c.SSEvent("message", models)
105-
c.Writer.Flush()
112+
sendLogData := func(source string, data []byte) {
113+
data, err := json.Marshal(gin.H{
114+
"source": source,
115+
"data": string(data),
116+
})
117+
if err == nil {
118+
select {
119+
case sendBuffer <- messageEnvelope{Type: msgTypeLogData, Data: string(data)}:
120+
case <-ctx.Done():
121+
return
122+
default:
123+
}
106124
}
125+
}
126+
127+
/**
128+
* Send updated models list
129+
*/
130+
defer event.On(func(e ProcessStateChangeEvent) {
131+
sendModels()
132+
})()
133+
defer event.On(func(e ConfigFileChangedEvent) {
134+
sendModels()
107135
})()
108136

109-
select {
110-
case <-c.Request.Context().Done():
111-
case <-pm.shutdownCtx.Done():
112-
}
137+
/**
138+
* Send Log data
139+
*/
140+
defer pm.proxyLogger.OnLogData(func(data []byte) {
141+
sendLogData("proxy", data)
142+
})()
143+
defer pm.upstreamLogger.OnLogData(func(data []byte) {
144+
sendLogData("upstream", data)
145+
})()
113146

147+
// send initial batch of data
148+
sendLogData("proxy", pm.proxyLogger.GetHistory())
149+
sendLogData("upstream", pm.upstreamLogger.GetHistory())
150+
sendModels()
151+
152+
for {
153+
select {
154+
case <-c.Request.Context().Done():
155+
cancel()
156+
return
157+
case <-pm.shutdownCtx.Done():
158+
cancel()
159+
return
160+
case msg := <-sendBuffer:
161+
c.SSEvent("message", msg)
162+
c.Writer.Flush()
163+
}
164+
}
114165
}

proxy/proxymanager_loghandlers.go

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package proxy
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67
"strings"
@@ -52,53 +53,29 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
5253
}
5354
}
5455

56+
sendChan := make(chan []byte, 10)
57+
ctx, cancel := context.WithCancel(c.Request.Context())
5558
defer logger.OnLogData(func(data []byte) {
56-
if c != nil && c.Writer != nil {
57-
c.Writer.Write(data)
58-
flusher.Flush()
59+
select {
60+
case sendChan <- data:
61+
case <-ctx.Done():
62+
return
63+
default:
5964
}
6065
})()
6166

62-
select {
63-
case <-c.Request.Context().Done():
64-
case <-pm.shutdownCtx.Done():
65-
}
66-
67-
}
68-
69-
func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
70-
c.Header("Content-Type", "text/event-stream")
71-
c.Header("Cache-Control", "no-cache")
72-
c.Header("Connection", "keep-alive")
73-
c.Header("X-Content-Type-Options", "nosniff")
74-
75-
logMonitorId := c.Param("logMonitorID")
76-
logger, err := pm.getLogger(logMonitorId)
77-
if err != nil {
78-
c.String(http.StatusBadRequest, err.Error())
79-
return
80-
}
81-
82-
// Send history first if not skipped
83-
_, skipHistory := c.GetQuery("no-history")
84-
if !skipHistory {
85-
history := logger.GetHistory()
86-
if len(history) != 0 {
87-
c.SSEvent("message", string(history))
88-
c.Writer.Flush()
89-
}
90-
}
91-
92-
defer logger.OnLogData(func(data []byte) {
93-
if c != nil && c.Writer != nil {
94-
c.SSEvent("message", string(data))
95-
c.Writer.Flush()
67+
for {
68+
select {
69+
case <-c.Request.Context().Done():
70+
cancel()
71+
return
72+
case <-pm.shutdownCtx.Done():
73+
cancel()
74+
return
75+
case data := <-sendChan:
76+
c.Writer.Write(data)
77+
flusher.Flush()
9678
}
97-
})()
98-
99-
select {
100-
case <-c.Request.Context().Done():
101-
case <-pm.shutdownCtx.Done():
10279
}
10380
}
10481

0 commit comments

Comments
 (0)