1919package grpcsync
2020
2121import (
22- "fmt"
2322 "sync"
2423 "testing"
2524 "time"
26-
27- "github.com/google/go-cmp/cmp"
2825)
2926
3027type testSubscriber struct {
31- mu sync.Mutex
32- msgs []int
33- onMsgCh chan struct {}
28+ onMsgCh chan int
3429}
3530
3631func newTestSubscriber (chSize int ) * testSubscriber {
37- return & testSubscriber {onMsgCh : make (chan struct {} , chSize )}
32+ return & testSubscriber {onMsgCh : make (chan int , chSize )}
3833}
3934
4035func (ts * testSubscriber ) OnMessage (msg interface {}) {
41- ts .mu .Lock ()
42- defer ts .mu .Unlock ()
43- ts .msgs = append (ts .msgs , msg .(int ))
4436 select {
45- case ts .onMsgCh <- struct {}{} :
37+ case ts .onMsgCh <- msg .( int ) :
4638 default :
4739 }
4840}
4941
50- func (ts * testSubscriber ) receivedMsgs () []int {
51- ts .mu .Lock ()
52- defer ts .mu .Unlock ()
53-
54- msgs := make ([]int , len (ts .msgs ))
55- copy (msgs , ts .msgs )
56-
57- return msgs
58- }
59-
6042func (s ) TestPubSub_PublishNoMsg (t * testing.T ) {
6143 pubsub := NewPubSub ()
6244 defer pubsub .Stop ()
@@ -66,7 +48,7 @@ func (s) TestPubSub_PublishNoMsg(t *testing.T) {
6648
6749 select {
6850 case <- ts .onMsgCh :
69- t .Fatalf ("Subscriber callback invoked when no message was published" )
51+ t .Fatal ("Subscriber callback invoked when no message was published" )
7052 case <- time .After (defaultTestShortTimeout ):
7153 }
7254}
@@ -78,95 +60,92 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
7860
7961 ts1 := newTestSubscriber (numPublished )
8062 pubsub .Subscribe (ts1 )
81- wantMsgs1 := []int {}
8263
8364 var wg sync.WaitGroup
8465 wg .Add (2 )
8566 // Publish ten messages on the pubsub and ensure that they are received in order by the subscriber.
8667 go func () {
8768 for i := 0 ; i < numPublished ; i ++ {
8869 pubsub .Publish (i )
89- wantMsgs1 = append (wantMsgs1 , i )
9070 }
9171 wg .Done ()
9272 }()
9373
94- isTimeout := false
9574 go func () {
75+ defer wg .Done ()
9676 for i := 0 ; i < numPublished ; i ++ {
9777 select {
98- case <- ts1 .onMsgCh :
78+ case m := <- ts1 .onMsgCh :
79+ if m != i {
80+ t .Errorf ("Received unexpected message: %q; want: %q" , m , i )
81+ return
82+ }
9983 case <- time .After (defaultTestTimeout ):
100- isTimeout = true
84+ t .Error ("Timeout when expecting the onMessage() callback to be invoked" )
85+ return
10186 }
10287 }
103- wg .Done ()
10488 }()
105-
10689 wg .Wait ()
107- if isTimeout {
108- t .Fatalf ("Timeout when expecting the onMessage() callback to be invoked" )
109- }
110- if gotMsgs1 := ts1 .receivedMsgs (); ! cmp .Equal (gotMsgs1 , wantMsgs1 ) {
111- t .Fatalf ("Received messages is %v, want %v" , gotMsgs1 , wantMsgs1 )
90+ if t .Failed () {
91+ t .FailNow ()
11292 }
11393
11494 // Register another subscriber and ensure that it receives the last published message.
11595 ts2 := newTestSubscriber (numPublished )
11696 pubsub .Subscribe (ts2 )
117- wantMsgs2 := wantMsgs1 [len (wantMsgs1 )- 1 :]
11897
11998 select {
120- case <- ts2 .onMsgCh :
99+ case m := <- ts2 .onMsgCh :
100+ if m != numPublished - 1 {
101+ t .Fatalf ("Received unexpected message: %q; want: %q" , m , numPublished - 1 )
102+ }
121103 case <- time .After (defaultTestShortTimeout ):
122- t .Fatalf ("Timeout when expecting the onMessage() callback to be invoked" )
123- }
124- if gotMsgs2 := ts2 .receivedMsgs (); ! cmp .Equal (gotMsgs2 , wantMsgs2 ) {
125- t .Fatalf ("Received messages is %v, want %v" , gotMsgs2 , wantMsgs2 )
104+ t .Fatal ("Timeout when expecting the onMessage() callback to be invoked" )
126105 }
127106
128107 wg .Add (3 )
129108 // Publish ten messages on the pubsub and ensure that they are received in order by the subscribers.
130109 go func () {
131110 for i := 0 ; i < numPublished ; i ++ {
132111 pubsub .Publish (i )
133- wantMsgs1 = append (wantMsgs1 , i )
134- wantMsgs2 = append (wantMsgs2 , i )
135112 }
136113 wg .Done ()
137114 }()
138- errCh := make (chan error , 1 )
139115 go func () {
116+ defer wg .Done ()
140117 for i := 0 ; i < numPublished ; i ++ {
141118 select {
142- case <- ts1 .onMsgCh :
119+ case m := <- ts1 .onMsgCh :
120+ if m != i {
121+ t .Errorf ("Received unexpected message: %q; want: %q" , m , i )
122+ return
123+ }
143124 case <- time .After (defaultTestTimeout ):
144- errCh <- fmt .Errorf ("" )
125+ t .Error ("Timeout when expecting the onMessage() callback to be invoked" )
126+ return
145127 }
146128 }
147- wg . Done ()
129+
148130 }()
149131 go func () {
132+ defer wg .Done ()
150133 for i := 0 ; i < numPublished ; i ++ {
151134 select {
152- case <- ts2 .onMsgCh :
135+ case m := <- ts2 .onMsgCh :
136+ if m != i {
137+ t .Errorf ("Received unexpected message: %q; want: %q" , m , i )
138+ return
139+ }
153140 case <- time .After (defaultTestTimeout ):
154- errCh <- fmt .Errorf ("" )
141+ t .Error ("Timeout when expecting the onMessage() callback to be invoked" )
142+ return
155143 }
156144 }
157- wg .Done ()
158145 }()
159146 wg .Wait ()
160- select {
161- case <- errCh :
162- t .Fatalf ("Timeout when expecting the onMessage() callback to be invoked" )
163- default :
164- }
165- if gotMsgs1 := ts1 .receivedMsgs (); ! cmp .Equal (gotMsgs1 , wantMsgs1 ) {
166- t .Fatalf ("Received messages is %v, want %v" , gotMsgs1 , wantMsgs1 )
167- }
168- if gotMsgs2 := ts2 .receivedMsgs (); ! cmp .Equal (gotMsgs2 , wantMsgs2 ) {
169- t .Fatalf ("Received messages is %v, want %v" , gotMsgs2 , wantMsgs2 )
147+ if t .Failed () {
148+ t .FailNow ()
170149 }
171150
172151 pubsub .Stop ()
@@ -178,9 +157,9 @@ func (s) TestPubSub_PublishMsgs_RegisterSubs_And_Stop(t *testing.T) {
178157 // pubsub has already closed.
179158 select {
180159 case <- ts1 .onMsgCh :
181- t .Fatalf ("The callback was invoked after pubsub being stopped" )
160+ t .Fatal ("The callback was invoked after pubsub being stopped" )
182161 case <- ts2 .onMsgCh :
183- t .Fatalf ("The callback was invoked after pubsub being stopped" )
162+ t .Fatal ("The callback was invoked after pubsub being stopped" )
184163 case <- time .After (defaultTestShortTimeout ):
185164 }
186165}
@@ -197,15 +176,15 @@ func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) {
197176 ts := newTestSubscriber (numPublished )
198177 pubsub .Subscribe (ts )
199178
200- wantMsgs := []int {numPublished - 1 }
201179 // Ensure that the subscriber callback is invoked with a previously
202180 // published message.
203181 select {
204- case <- ts .onMsgCh :
205- if gotMsgs := ts . receivedMsgs (); ! cmp . Equal ( gotMsgs , wantMsgs ) {
206- t .Fatalf ("Received messages is %v, want %v " , gotMsgs , wantMsgs )
182+ case d := <- ts .onMsgCh :
183+ if d != numPublished - 1 {
184+ t .Fatalf ("Unexpected message received: %q; %q " , d , numPublished - 1 )
207185 }
186+
208187 case <- time .After (defaultTestShortTimeout ):
209- t .Fatalf ("Timeout when expecting the onMessage() callback to be invoked" )
188+ t .Fatal ("Timeout when expecting the onMessage() callback to be invoked" )
210189 }
211190}
0 commit comments