Skip to content

Commit 37550d1

Browse files
Use InputVideo/AudioState from WHIPRTCConnectionNotify. (#384)
Also terminate the session if we haven't received any Notify in 3 min
1 parent 3b12f4e commit 37550d1

File tree

3 files changed

+45
-4
lines changed

3 files changed

+45
-4
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ require (
1616
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed
1717
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
1818
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564
19-
github.com/livekit/protocol v1.41.1-0.20250923010807-539194425c60
19+
github.com/livekit/protocol v1.42.1-0.20251006173230-a7bc52ad9724
2020
github.com/livekit/psrpc v0.7.0
2121
github.com/livekit/server-sdk-go/v2 v2.4.2
2222
github.com/pion/dtls/v3 v3.0.7

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 h1:9x+U2HGLrSw5AT
116116
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
117117
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564 h1:GX7KF/V9ExmcfT/2Bdia8aROjkxrgx7WpyH7w9MB4J4=
118118
github.com/livekit/mediatransportutil v0.0.0-20241220010243-a2bdee945564/go.mod h1:36s+wwmU3O40IAhE+MjBWP3W71QRiEE9SfooSBvtBqY=
119-
github.com/livekit/protocol v1.41.1-0.20250923010807-539194425c60 h1:rdWGS5IIQwb/dX94mrBF0LsCxHiwHZruJxqxoP34WNg=
120-
github.com/livekit/protocol v1.41.1-0.20250923010807-539194425c60/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
119+
github.com/livekit/protocol v1.42.1-0.20251006173230-a7bc52ad9724 h1:oz2RQahwZ8LPaGX+2RzAR08s+zqJJSep+OCvWuD90I4=
120+
github.com/livekit/protocol v1.42.1-0.20251006173230-a7bc52ad9724/go.mod h1:vhMS30QoEyH2p34vi6X1eWkC4EMV72ZGZwQb74ajY7A=
121121
github.com/livekit/psrpc v0.7.0 h1:rtfqfjYN06WJYloE/S0nmkJ/Y04x4pxLQLe8kQ4FVHU=
122122
github.com/livekit/psrpc v0.7.0/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
123123
github.com/livekit/server-sdk-go/v2 v2.4.2 h1:q6ioBWpwLaLNj41f96eLQHi11kRyiY9MfEb5D3zi5AI=

pkg/whip/whip_proxy_handler.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net/http"
2323
"net/url"
2424
"strings"
25+
"sync"
2526
"time"
2627

2728
"github.com/frostbyte73/core"
@@ -38,6 +39,10 @@ import (
3839
"github.com/livekit/ingress/pkg/utils"
3940
)
4041

42+
const (
43+
sessionNotifyTimeout = 3 * time.Minute
44+
)
45+
4146
type proxyWhipHandler struct {
4247
logger logger.Logger
4348
params *params.Params
@@ -47,6 +52,9 @@ type proxyWhipHandler struct {
4752

4853
done core.Fuse
4954
rpcServer rpc.IngressHandlerServer
55+
56+
mu sync.Mutex
57+
watchdog *time.Timer
5058
}
5159

5260
func NewProxyWHIPHandler(p *params.Params, bus psrpc.MessageBus, ua string) (WHIPHandler, error) {
@@ -183,6 +191,13 @@ func (h *proxyWhipHandler) close(isRTCClosed bool) {
183191
return
184192
}
185193

194+
h.mu.Lock()
195+
if h.watchdog != nil {
196+
h.watchdog.Stop()
197+
h.watchdog = nil
198+
}
199+
h.mu.Unlock()
200+
186201
utils.DeregisterIngressRpcHandlers(h.rpcServer, h.params.IngressInfo)
187202
if h.participantID != "" {
188203
if isRTCClosed {
@@ -225,6 +240,8 @@ func (h *proxyWhipHandler) close(isRTCClosed bool) {
225240
}
226241

227242
func (h *proxyWhipHandler) WaitForSessionEnd(ctx context.Context) error {
243+
h.resetWatchDog()
244+
228245
select {
229246
case <-h.done.Watch():
230247
case <-ctx.Done():
@@ -313,11 +330,35 @@ func (h *proxyWhipHandler) ICERestartWHIPResource(ctx context.Context, req *rpc.
313330
}
314331

315332
func (h *proxyWhipHandler) WHIPRTCConnectionNotify(ctx context.Context, req *rpc.WHIPRTCConnectionNotifyRequest) (*google_protobuf2.Empty, error) {
316-
h.logger.Infow("WHIPRTCConnectionNotify", "participantID", h.participantID, "req", req)
333+
tctx, done := context.WithTimeout(context.Background(), 10*time.Second)
334+
defer done()
335+
336+
h.resetWatchDog()
337+
338+
if req.Video != nil {
339+
h.params.SetInputVideoState(tctx, req.Video, true)
340+
}
341+
342+
if req.Audio != nil {
343+
h.params.SetInputAudioState(tctx, req.Audio, true)
344+
}
317345

318346
if req.Closed {
319347
h.close(true)
320348
}
321349

322350
return &google_protobuf2.Empty{}, nil
323351
}
352+
353+
func (h *proxyWhipHandler) resetWatchDog() {
354+
h.mu.Lock()
355+
if h.watchdog != nil {
356+
h.watchdog.Stop()
357+
}
358+
359+
h.watchdog = time.AfterFunc(sessionNotifyTimeout, func() {
360+
h.logger.Infow("no Notify call from the SFU, terminating ingress")
361+
h.done.Break()
362+
})
363+
h.mu.Unlock()
364+
}

0 commit comments

Comments
 (0)