Skip to content

Commit b93c4d4

Browse files
masteradaSean-Der
authored andcommitted
review fixes, rename receiver -> generator, sender -> responder, linter fixes
1 parent 499eb2c commit b93c4d4

15 files changed

+255
-180
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.15
44

55
require (
66
github.com/pion/logging v0.2.2
7-
github.com/pion/rtcp v1.2.4
7+
github.com/pion/rtcp v1.2.6
88
github.com/pion/rtp v1.6.1
99
github.com/stretchr/testify v1.6.1
1010
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
44
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
55
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
66
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
7-
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
8-
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
7+
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
8+
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
99
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
1010
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
1111
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

test/stream.go renamed to internal/test/stream.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// Package test provides helpers for testing interceptors
12
package test
23

34
import (
@@ -8,6 +9,7 @@ import (
89
"github.com/pion/rtp"
910
)
1011

12+
// Stream is a helper struct for testing interceptors.
1113
type Stream struct {
1214
interceptor interceptor.Interceptor
1315

@@ -26,16 +28,19 @@ type Stream struct {
2628
rtpInModified chan RTPWithError
2729
}
2830

31+
// RTPWithError is used to send an rtp packet or an error on a channel
2932
type RTPWithError struct {
3033
Packet *rtp.Packet
3134
Err error
3235
}
3336

37+
// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
3438
type RTCPWithError struct {
3539
Packets []rtcp.Packet
3640
Err error
3741
}
3842

43+
// NewStream creates a new Stream
3944
func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream {
4045
s := &Stream{
4146
interceptor: i,
@@ -107,40 +112,49 @@ func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream
107112
return s
108113
}
109114

115+
// WriteRTCP writes a batch of rtcp packet to the stream, using the interceptor
110116
func (s *Stream) WriteRTCP(pkts []rtcp.Packet) error {
111117
_, err := s.rtcpWriter.Write(pkts, interceptor.Attributes{})
112118
return err
113119
}
114120

121+
// WriteRTP writes an rtp packet to the stream, using the interceptor
115122
func (s *Stream) WriteRTP(p *rtp.Packet) error {
116123
_, err := s.rtpWriter.Write(p, interceptor.Attributes{})
117124
return err
118125
}
119126

127+
// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
120128
func (s *Stream) ReceiveRTCP(pkts []rtcp.Packet) {
121129
s.rtcpIn <- pkts
122130
}
123131

132+
// ReceiveRTP schedules a rtp packet, so it can be read be the stream
124133
func (s *Stream) ReceiveRTP(packet *rtp.Packet) {
125134
s.rtpIn <- packet
126135
}
127136

137+
// WrittenRTCP returns a channel containing the rtcp batches written, modified by the interceptor
128138
func (s *Stream) WrittenRTCP() chan []rtcp.Packet {
129139
return s.rtcpOutModified
130140
}
131141

142+
// WrittenRTP returns a channel containing rtp packets written, modified by the interceptor
132143
func (s *Stream) WrittenRTP() chan *rtp.Packet {
133144
return s.rtpOutModified
134145
}
135146

147+
// ReadRTCP returns a channel containing the rtcp batched read, modified by the interceptor
136148
func (s *Stream) ReadRTCP() chan RTCPWithError {
137149
return s.rtcpInModified
138150
}
139151

152+
// ReadRTP returns a channel containing the rtp packets read, modified by the interceptor
140153
func (s *Stream) ReadRTP() chan RTPWithError {
141154
return s.rtpInModified
142155
}
143156

157+
// Close closes the stream and the underlying interceptor
144158
func (s *Stream) Close() error {
145159
close(s.rtcpIn)
146160
close(s.rtpIn)
File renamed without changes.

pkg/nack/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
2+
package nack
3+
4+
import "errors"
5+
6+
// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
7+
var ErrInvalidSize = errors.New("invalid buffer size")

nack/receiver_interceptor.go renamed to pkg/nack/generator_interceptor.go

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import (
1111
"github.com/pion/rtp"
1212
)
1313

14-
// ReceiverInterceptor interceptor generates nack messages.
15-
type ReceiverInterceptor struct {
14+
// GeneratorInterceptor interceptor generates nack feedback messages.
15+
type GeneratorInterceptor struct {
1616
interceptor.NoOp
1717
size uint16
1818
skipLastN uint16
@@ -24,38 +24,42 @@ type ReceiverInterceptor struct {
2424
log logging.LeveledLogger
2525
}
2626

27-
// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor
28-
func NewReceiverInterceptor(size uint16, skipLastN uint16, interval time.Duration, log logging.LeveledLogger) (*ReceiverInterceptor, error) {
29-
_, err := NewReceiveLog(size)
30-
if err != nil {
31-
return nil, err
32-
}
33-
34-
return &ReceiverInterceptor{
27+
// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
28+
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
29+
r := &GeneratorInterceptor{
3530
NoOp: interceptor.NoOp{},
36-
size: size,
37-
skipLastN: skipLastN,
38-
interval: interval,
31+
size: 8192,
32+
skipLastN: 0,
33+
interval: time.Millisecond * 100,
3934
receiveLogs: &sync.Map{},
4035
close: make(chan struct{}),
41-
log: log,
42-
}, nil
36+
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
37+
}
38+
39+
for _, opt := range opts {
40+
opt(r)
41+
}
42+
43+
if _, err := newReceiveLog(r.size); err != nil {
44+
return nil, err
45+
}
46+
47+
return r, nil
4348
}
4449

4550
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
4651
// will be called once per packet batch.
47-
func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
52+
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
4853
n.m.Lock()
54+
defer n.m.Unlock()
4955
select {
5056
case <-n.close:
5157
// already closed
52-
n.m.Unlock()
5358
return writer
5459
default:
5560
}
5661

5762
n.wg.Add(1)
58-
n.m.Unlock()
5963

6064
go n.loop(writer)
6165

@@ -64,7 +68,7 @@ func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) inte
6468

6569
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
6670
// will be called once per rtp packet.
67-
func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
71+
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
6872
hasNack := false
6973
for _, fb := range info.RTCPFeedback {
7074
if fb.Type == "nack" && fb.Parameter == "" {
@@ -76,8 +80,8 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
7680
return reader
7781
}
7882

79-
// error is already checked in NewReceiverInterceptor
80-
receiveLog, _ := NewReceiveLog(n.size)
83+
// error is already checked in NewGeneratorInterceptor
84+
receiveLog, _ := newReceiveLog(n.size)
8185
n.receiveLogs.Store(info.SSRC, receiveLog)
8286

8387
return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) {
@@ -86,18 +90,19 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
8690
return nil, nil, err
8791
}
8892

89-
receiveLog.Add(p.SequenceNumber)
93+
receiveLog.add(p.SequenceNumber)
9094

9195
return p, attr, nil
9296
})
9397
}
9498

9599
// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
96-
func (n *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
100+
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
97101
n.receiveLogs.Delete(info.SSRC)
98102
}
99103

100-
func (n *ReceiverInterceptor) Close() error {
104+
// Close closes the interceptor
105+
func (n *GeneratorInterceptor) Close() error {
101106
defer n.wg.Wait()
102107
n.m.Lock()
103108
defer n.m.Unlock()
@@ -114,32 +119,31 @@ func (n *ReceiverInterceptor) Close() error {
114119
return nil
115120
}
116121

117-
func (n *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
122+
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
118123
defer n.wg.Done()
119124

120-
senderSSRC := rand.Uint32()
125+
senderSSRC := rand.Uint32() // #nosec
121126

122127
ticker := time.NewTicker(n.interval)
123128
for {
124129
select {
125130
case <-ticker.C:
126131
n.receiveLogs.Range(func(key, value interface{}) bool {
127132
ssrc := key.(uint32)
128-
receiveLog := value.(*ReceiveLog)
133+
receiveLog := value.(*receiveLog)
129134

130-
missing := receiveLog.MissingSeqNumbers(n.skipLastN)
135+
missing := receiveLog.missingSeqNumbers(n.skipLastN)
131136
if len(missing) == 0 {
132137
return true
133138
}
134139

135140
nack := &rtcp.TransportLayerNack{
136141
SenderSSRC: senderSSRC,
137142
MediaSSRC: ssrc,
138-
Nacks: nackPairs(missing),
143+
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
139144
}
140145

141-
_, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{})
142-
if err != nil {
146+
if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
143147
n.log.Warnf("failed sending nack: %+v", err)
144148
}
145149

@@ -151,25 +155,3 @@ func (n *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
151155
}
152156
}
153157
}
154-
155-
func nackPairs(seqNums []uint16) []rtcp.NackPair {
156-
// TODO: I think this shoud be moved to rtcp package
157-
pairs := make([]rtcp.NackPair, 0)
158-
startSeq := seqNums[0]
159-
nackPair := &rtcp.NackPair{PacketID: startSeq}
160-
for i := 1; i < len(seqNums); i++ {
161-
m := seqNums[i]
162-
163-
if m-nackPair.PacketID > 16 {
164-
pairs = append(pairs, *nackPair)
165-
nackPair = &rtcp.NackPair{PacketID: m}
166-
continue
167-
}
168-
169-
nackPair.LostPackets |= 1 << (m - nackPair.PacketID - 1)
170-
}
171-
172-
pairs = append(pairs, *nackPair)
173-
174-
return pairs
175-
}

nack/receiver_interceptor_test.go renamed to pkg/nack/generator_interceptor_test.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,21 @@ import (
55
"time"
66

77
"github.com/pion/interceptor"
8-
"github.com/pion/interceptor/test"
8+
"github.com/pion/interceptor/internal/test"
99
"github.com/pion/logging"
1010
"github.com/pion/rtcp"
1111
"github.com/pion/rtp"
1212
"github.com/stretchr/testify/assert"
1313
)
1414

15-
func TestReceiverInterceptor(t *testing.T) {
15+
func TestGeneratorInterceptor(t *testing.T) {
1616
const interval = time.Millisecond * 10
17-
i, err := NewReceiverInterceptor(64, 2, interval, logging.NewDefaultLoggerFactory().NewLogger("test"))
17+
i, err := NewGeneratorInterceptor(
18+
GeneratorSize(64),
19+
GeneratorSkipLastN(2),
20+
GeneratorInterval(interval),
21+
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
22+
)
1823
if err != nil {
1924
t.Fatal(err)
2025
}
@@ -24,20 +29,15 @@ func TestReceiverInterceptor(t *testing.T) {
2429
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
2530
}, i)
2631
defer func() {
27-
err := stream.Close()
28-
if err != nil {
29-
t.Errorf("error closing stream: %v", err)
30-
}
32+
assert.NoError(t, stream.Close())
3133
}()
3234

3335
for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} {
3436
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})
3537

3638
select {
3739
case r := <-stream.ReadRTP():
38-
if r.Err != nil {
39-
t.Fatal(r.Err)
40-
}
40+
assert.NoError(t, r.Err)
4141
assert.Equal(t, seqNum, r.Packet.SequenceNumber)
4242
case <-time.After(10 * time.Millisecond):
4343
t.Fatal("receiver rtp packet not found")
@@ -54,17 +54,19 @@ func TestReceiverInterceptor(t *testing.T) {
5454

5555
select {
5656
case pkts := <-stream.WrittenRTCP():
57-
if len(pkts) != 1 {
58-
t.Fatalf("single packet rtcp batch expected, found: %v", pkts)
59-
}
57+
assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected")
58+
6059
p, ok := pkts[0].(*rtcp.TransportLayerNack)
61-
if !ok {
62-
t.Fatalf("TransportLayerNack rtcp packet expected, found: %T", pkts[0])
63-
}
60+
assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0])
6461

6562
assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
66-
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is set to 2)
63+
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
6764
case <-time.After(10 * time.Millisecond):
6865
t.Fatal("written rtcp packet not found")
6966
}
7067
}
68+
69+
func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
70+
_, err := NewGeneratorInterceptor(GeneratorSize(5))
71+
assert.Error(t, err, ErrInvalidSize)
72+
}

0 commit comments

Comments
 (0)