Skip to content

Commit 8d50647

Browse files
masteradaSean-Der
authored andcommitted
Add Nack Interceptors
Add ResponderInterceptor which responds to NACK Requests Add GeneratorInterceptor which generates NACK Requests
1 parent e0e437d commit 8d50647

19 files changed

+906
-27
lines changed

chain.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build !js
2-
31
package interceptor
42

53
// Chain is an interceptor that runs all child interceptors in order.

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module github.com/pion/interceptor
33
go 1.15
44

55
require (
6-
github.com/pion/rtcp v1.2.4
6+
github.com/pion/logging v0.2.2
7+
github.com/pion/rtcp v1.2.6
78
github.com/pion/rtp v1.6.1
9+
github.com/stretchr/testify v1.6.1
810
)

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
4+
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
35
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
46
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
5-
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
6-
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
7+
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
8+
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
79
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
810
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
911
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

interceptor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build !js
2-
31
// Package interceptor contains the Interceptor interface, with some useful interceptors that should be safe to use
42
// in most cases.
53
package interceptor

nack.go

Lines changed: 0 additions & 14 deletions
This file was deleted.

noop.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
// +build !js
2-
31
package interceptor
42

53
// NoOp is an Interceptor that does not modify any packets. It can embedded in other interceptors, so it's

pkg/nack/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
2+
package nack
3+
4+
import "errors"
5+
6+
// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
7+
var ErrInvalidSize = errors.New("invalid buffer size")

pkg/nack/generator_interceptor.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package nack
2+
3+
import (
4+
"math/rand"
5+
"sync"
6+
"time"
7+
8+
"github.com/pion/interceptor"
9+
"github.com/pion/logging"
10+
"github.com/pion/rtcp"
11+
"github.com/pion/rtp"
12+
)
13+
14+
// GeneratorInterceptor interceptor generates nack feedback messages.
15+
type GeneratorInterceptor struct {
16+
interceptor.NoOp
17+
size uint16
18+
skipLastN uint16
19+
interval time.Duration
20+
receiveLogs *sync.Map
21+
m sync.Mutex
22+
wg sync.WaitGroup
23+
close chan struct{}
24+
log logging.LeveledLogger
25+
}
26+
27+
// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
28+
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
29+
r := &GeneratorInterceptor{
30+
NoOp: interceptor.NoOp{},
31+
size: 8192,
32+
skipLastN: 0,
33+
interval: time.Millisecond * 100,
34+
receiveLogs: &sync.Map{},
35+
close: make(chan struct{}),
36+
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
37+
}
38+
39+
for _, opt := range opts {
40+
opt(r)
41+
}
42+
43+
if _, err := newReceiveLog(r.size); err != nil {
44+
return nil, err
45+
}
46+
47+
return r, nil
48+
}
49+
50+
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
51+
// will be called once per packet batch.
52+
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
53+
n.m.Lock()
54+
defer n.m.Unlock()
55+
select {
56+
case <-n.close:
57+
// already closed
58+
return writer
59+
default:
60+
}
61+
62+
n.wg.Add(1)
63+
64+
go n.loop(writer)
65+
66+
return writer
67+
}
68+
69+
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
70+
// will be called once per rtp packet.
71+
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
72+
hasNack := false
73+
for _, fb := range info.RTCPFeedback {
74+
if fb.Type == "nack" && fb.Parameter == "" {
75+
hasNack = true
76+
}
77+
}
78+
79+
if !hasNack {
80+
return reader
81+
}
82+
83+
// error is already checked in NewGeneratorInterceptor
84+
receiveLog, _ := newReceiveLog(n.size)
85+
n.receiveLogs.Store(info.SSRC, receiveLog)
86+
87+
return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) {
88+
p, attr, err := reader.Read()
89+
if err != nil {
90+
return nil, nil, err
91+
}
92+
93+
receiveLog.add(p.SequenceNumber)
94+
95+
return p, attr, nil
96+
})
97+
}
98+
99+
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
100+
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
101+
n.receiveLogs.Delete(info.SSRC)
102+
}
103+
104+
// Close closes the interceptor
105+
func (n *GeneratorInterceptor) Close() error {
106+
defer n.wg.Wait()
107+
n.m.Lock()
108+
defer n.m.Unlock()
109+
110+
select {
111+
case <-n.close:
112+
// already closed
113+
return nil
114+
default:
115+
}
116+
117+
close(n.close)
118+
119+
return nil
120+
}
121+
122+
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
123+
defer n.wg.Done()
124+
125+
senderSSRC := rand.Uint32() // #nosec
126+
127+
ticker := time.NewTicker(n.interval)
128+
for {
129+
select {
130+
case <-ticker.C:
131+
n.receiveLogs.Range(func(key, value interface{}) bool {
132+
ssrc := key.(uint32)
133+
receiveLog := value.(*receiveLog)
134+
135+
missing := receiveLog.missingSeqNumbers(n.skipLastN)
136+
if len(missing) == 0 {
137+
return true
138+
}
139+
140+
nack := &rtcp.TransportLayerNack{
141+
SenderSSRC: senderSSRC,
142+
MediaSSRC: ssrc,
143+
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
144+
}
145+
146+
if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
147+
n.log.Warnf("failed sending nack: %+v", err)
148+
}
149+
150+
return true
151+
})
152+
153+
case <-n.close:
154+
return
155+
}
156+
}
157+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package nack
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/pion/interceptor"
8+
"github.com/pion/interceptor/internal/test"
9+
"github.com/pion/logging"
10+
"github.com/pion/rtcp"
11+
"github.com/pion/rtp"
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
func TestGeneratorInterceptor(t *testing.T) {
16+
const interval = time.Millisecond * 10
17+
i, err := NewGeneratorInterceptor(
18+
GeneratorSize(64),
19+
GeneratorSkipLastN(2),
20+
GeneratorInterval(interval),
21+
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
22+
)
23+
assert.NoError(t, err)
24+
25+
stream := test.NewMockStream(&interceptor.StreamInfo{
26+
SSRC: 1,
27+
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
28+
}, i)
29+
defer func() {
30+
assert.NoError(t, stream.Close())
31+
}()
32+
33+
for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} {
34+
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})
35+
36+
select {
37+
case r := <-stream.ReadRTP():
38+
assert.NoError(t, r.Err)
39+
assert.Equal(t, seqNum, r.Packet.SequenceNumber)
40+
case <-time.After(10 * time.Millisecond):
41+
t.Fatal("receiver rtp packet not found")
42+
}
43+
}
44+
45+
time.Sleep(interval * 2) // wait for at least 2 nack packets
46+
47+
select {
48+
case <-stream.WrittenRTCP():
49+
// ignore the first nack, it might only contain the sequence id 13 as missing
50+
default:
51+
}
52+
53+
select {
54+
case pkts := <-stream.WrittenRTCP():
55+
assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected")
56+
57+
p, ok := pkts[0].(*rtcp.TransportLayerNack)
58+
assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0])
59+
60+
assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
61+
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
62+
case <-time.After(10 * time.Millisecond):
63+
t.Fatal("written rtcp packet not found")
64+
}
65+
}
66+
67+
func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
68+
_, err := NewGeneratorInterceptor(GeneratorSize(5))
69+
assert.Error(t, err, ErrInvalidSize)
70+
}

pkg/nack/generator_option.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package nack
2+
3+
import (
4+
"time"
5+
6+
"github.com/pion/logging"
7+
)
8+
9+
// GeneratorOption can be used to configure GeneratorInterceptor
10+
type GeneratorOption func(r *GeneratorInterceptor)
11+
12+
// GeneratorSize sets the size of the interceptor.
13+
// Size must be one of: 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768
14+
func GeneratorSize(size uint16) GeneratorOption {
15+
return func(r *GeneratorInterceptor) {
16+
r.size = size
17+
}
18+
}
19+
20+
// GeneratorSkipLastN sets the number of packets (n-1 packets before the last received packets) to ignore when generating
21+
// nack requests.
22+
func GeneratorSkipLastN(skipLastN uint16) GeneratorOption {
23+
return func(r *GeneratorInterceptor) {
24+
r.skipLastN = skipLastN
25+
}
26+
}
27+
28+
// GeneratorLog sets a logger for the interceptor
29+
func GeneratorLog(log logging.LeveledLogger) GeneratorOption {
30+
return func(r *GeneratorInterceptor) {
31+
r.log = log
32+
}
33+
}
34+
35+
// GeneratorInterval sets the nack send interval for the interceptor
36+
func GeneratorInterval(interval time.Duration) GeneratorOption {
37+
return func(r *GeneratorInterceptor) {
38+
r.interval = interval
39+
}
40+
}

0 commit comments

Comments
 (0)