11package main
22
33import (
4+ "bufio"
45 "context"
56 "errors"
67 "flag"
@@ -13,6 +14,7 @@ import (
1314 "net/url"
1415 "os"
1516 "os/signal"
17+ "strings"
1618 "sync"
1719 "time"
1820)
@@ -92,14 +94,9 @@ func main() {
9294 }()
9395
9496 // graceful shutdown
95- ctx , stop := signal .NotifyContext (context .Background (), os .Interrupt )
96- defer stop ()
97+ ctx , _ := signal .NotifyContext (context .Background (), os .Interrupt )
9798 <- ctx .Done ()
98- shutdownCtx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
99- defer cancel ()
100- if err := server .Shutdown (shutdownCtx ); err != nil {
101- slog .Error ("server shutdown error" , "error" , err )
102- }
99+ server .Close ()
103100}
104101
105102type upstreamStatus string
@@ -124,37 +121,86 @@ func newProxy(url *url.URL) *proxyServer {
124121 failCount : 0 ,
125122 }
126123
127- // start a goroutien to check upstream status
124+ // start a goroutine to monitor upstream status via SSE
128125 go func () {
129- checkUrl := url .Scheme + "://" + url .Host + "/wol-health"
130- client := & http.Client {Timeout : time .Second }
131- ticker := time .NewTicker (2 * time .Second )
132- defer ticker .Stop ()
133- for range ticker .C {
126+ eventsUrl := url .Scheme + "://" + url .Host + "/api/events"
127+ client := & http.Client {
128+ Timeout : 0 , // No timeout for SSE connection
129+ }
134130
135- slog .Debug ("checking upstream status at" , "url" , checkUrl )
136- resp , err := client .Get (checkUrl )
131+ waitDuration := 10 * time .Second
137132
138- // drain the body
139- if err == nil && resp != nil {
133+ for {
134+ slog .Debug ("connecting to SSE endpoint" , "url" , eventsUrl )
135+
136+ req , err := http .NewRequest ("GET" , eventsUrl , nil )
137+ if err != nil {
138+ slog .Warn ("failed to create SSE request" , "error" , err )
139+ proxy .setStatus (notready )
140+ proxy .incFail (1 )
141+ time .Sleep (waitDuration )
142+ continue
143+ }
144+
145+ req .Header .Set ("Accept" , "text/event-stream" )
146+ req .Header .Set ("Cache-Control" , "no-cache" )
147+ req .Header .Set ("Connection" , "keep-alive" )
148+
149+ resp , err := client .Do (req )
150+ if err != nil {
151+ slog .Error ("failed to connect to SSE endpoint" , "error" , err )
152+ proxy .setStatus (notready )
153+ proxy .incFail (1 )
154+ time .Sleep (10 * time .Second )
155+ continue
156+ }
157+
158+ if resp .StatusCode != http .StatusOK {
159+ slog .Warn ("SSE endpoint returned non-OK status" , "status" , resp .StatusCode )
140160 _ , _ = io .Copy (io .Discard , resp .Body )
141161 _ = resp .Body .Close ()
162+ proxy .setStatus (notready )
163+ proxy .incFail (1 )
164+ time .Sleep (10 * time .Second )
165+ continue
142166 }
143167
144- if err == nil && resp != nil && resp .StatusCode == http .StatusOK {
145- slog .Debug ("upstream status: ready" )
146- proxy .setStatus (ready )
147- proxy .statusMutex .Lock ()
148- proxy .failCount = 0
149- proxy .statusMutex .Unlock ()
150- } else {
151- slog .Debug ("upstream status: notready" , "error" , err )
152- proxy .setStatus (notready )
153- proxy .statusMutex .Lock ()
154- proxy .failCount ++
155- proxy .statusMutex .Unlock ()
168+ // Successfully connected to SSE endpoint
169+ slog .Info ("connected to SSE endpoint, upstream ready" )
170+ proxy .setStatus (ready )
171+ proxy .resetFailures ()
172+
173+ // Read from the SSE stream to detect disconnection
174+ scanner := bufio .NewScanner (resp .Body )
175+
176+ // use a fairly large buffer to avoid scanner errors when reading large SSE events
177+ buf := make ([]byte , 0 , 1024 * 1024 * 2 )
178+ scanner .Buffer (buf , 1024 * 1024 * 2 )
179+ events := 0
180+ if slog .Default ().Enabled (context .Background (), slog .LevelDebug ) {
181+ fmt .Print ("Events: " )
182+ }
183+ for scanner .Scan () {
184+ if slog .Default ().Enabled (context .Background (), slog .LevelDebug ) {
185+ // Just read the events to keep connection alive
186+ // We don't need to process the event data
187+ events ++
188+ fmt .Printf ("%d, " , events )
189+ }
156190 }
191+ fmt .Println ()
192+ if err := scanner .Err (); err != nil {
193+ slog .Error ("error reading from SSE stream" , "error" , err )
194+ }
195+
196+ // Connection closed or error occurred
197+ _ = resp .Body .Close ()
198+ slog .Info ("SSE connection closed, upstream not ready" )
199+ proxy .setStatus (notready )
200+ proxy .incFail (1 )
157201
202+ // Wait before reconnecting
203+ time .Sleep (waitDuration )
158204 }
159205 }()
160206
@@ -163,10 +209,8 @@ func newProxy(url *url.URL) *proxyServer {
163209
164210func (p * proxyServer ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
165211 if r .Method == "GET" && r .URL .Path == "/status" {
166- p .statusMutex .RLock ()
167- status := string (p .status )
168- failCount := p .failCount
169- p .statusMutex .RUnlock ()
212+ status := string (p .getStatus ())
213+ failCount := p .getFailures ()
170214 w .Header ().Set ("Content-Type" , "text/plain" )
171215 w .WriteHeader (200 )
172216 fmt .Fprintf (w , "status: %s\n " , status )
@@ -175,7 +219,14 @@ func (p *proxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
175219 }
176220
177221 if p .getStatus () == notready {
178- slog .Info ("upstream not ready, sending magic packet" , "mac" , * flagMac )
222+ path := r .URL .Path
223+ if strings .HasPrefix (path , "/api/events" ) {
224+ slog .Debug ("Skipping wake up" , "req" , path )
225+ w .WriteHeader (http .StatusNoContent )
226+ return
227+ }
228+
229+ slog .Info ("upstream not ready, sending magic packet" , "req" , path , "from" , r .RemoteAddr )
179230 if err := sendMagicPacket (* flagMac ); err != nil {
180231 slog .Warn ("failed to send magic WoL packet" , "error" , err )
181232 }
@@ -213,6 +264,24 @@ func (p *proxyServer) setStatus(status upstreamStatus) {
213264 p .status = status
214265}
215266
267+ func (p * proxyServer ) incFail (num int ) {
268+ p .statusMutex .Lock ()
269+ defer p .statusMutex .Unlock ()
270+ p .failCount += num
271+ }
272+
273+ func (p * proxyServer ) getFailures () int {
274+ p .statusMutex .RLock ()
275+ defer p .statusMutex .RUnlock ()
276+ return p .failCount
277+ }
278+
279+ func (p * proxyServer ) resetFailures () {
280+ p .statusMutex .Lock ()
281+ defer p .statusMutex .Unlock ()
282+ p .failCount = 0
283+ }
284+
216285func sendMagicPacket (macAddr string ) error {
217286 hwAddr , err := net .ParseMAC (macAddr )
218287 if err != nil {
0 commit comments