Skip to content

Commit 4e474c7

Browse files
authored
rpc: fix subscription corner case and speed up tests (#17874)
Notifier tracks whether subscription are 'active'. A subscription becomes active when the subscription ID has been sent to the client. If the client sends notifications in the request handler before the subscription becomes active they are dropped. The tests tried to work around this problem by always waiting 5s before sending the first notification. Fix it by buffering notifications until the subscription becomes active. This speeds up all subscription tests. Also fix TestSubscriptionMultipleNamespaces to wait for three messages per subscription instead of six. The test now finishes just after all notifications have been received and doesn't hit the 30s timeout anymore.
1 parent da290e9 commit 4e474c7

File tree

2 files changed

+68
-71
lines changed

2 files changed

+68
-71
lines changed

rpc/subscription.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ type notifierKey struct{}
5252
// Server callbacks use the notifier to send notifications.
5353
type Notifier struct {
5454
codec ServerCodec
55-
subMu sync.RWMutex // guards active and inactive maps
55+
subMu sync.Mutex
5656
active map[ID]*Subscription
5757
inactive map[ID]*Subscription
58+
buffer map[ID][]interface{} // unsent notifications of inactive subscriptions
5859
}
5960

6061
// newNotifier creates a new notifier that can be used to send subscription
@@ -64,6 +65,7 @@ func newNotifier(codec ServerCodec) *Notifier {
6465
codec: codec,
6566
active: make(map[ID]*Subscription),
6667
inactive: make(map[ID]*Subscription),
68+
buffer: make(map[ID][]interface{}),
6769
}
6870
}
6971

@@ -88,20 +90,26 @@ func (n *Notifier) CreateSubscription() *Subscription {
8890
// Notify sends a notification to the client with the given data as payload.
8991
// If an error occurs the RPC connection is closed and the error is returned.
9092
func (n *Notifier) Notify(id ID, data interface{}) error {
91-
n.subMu.RLock()
92-
defer n.subMu.RUnlock()
93-
94-
sub, active := n.active[id]
95-
if active {
96-
notification := n.codec.CreateNotification(string(id), sub.namespace, data)
97-
if err := n.codec.Write(notification); err != nil {
98-
n.codec.Close()
99-
return err
100-
}
93+
n.subMu.Lock()
94+
defer n.subMu.Unlock()
95+
96+
if sub, active := n.active[id]; active {
97+
n.send(sub, data)
98+
} else {
99+
n.buffer[id] = append(n.buffer[id], data)
101100
}
102101
return nil
103102
}
104103

104+
func (n *Notifier) send(sub *Subscription, data interface{}) error {
105+
notification := n.codec.CreateNotification(string(sub.ID), sub.namespace, data)
106+
err := n.codec.Write(notification)
107+
if err != nil {
108+
n.codec.Close()
109+
}
110+
return err
111+
}
112+
105113
// Closed returns a channel that is closed when the RPC connection is closed.
106114
func (n *Notifier) Closed() <-chan interface{} {
107115
return n.codec.Closed()
@@ -127,9 +135,15 @@ func (n *Notifier) unsubscribe(id ID) error {
127135
func (n *Notifier) activate(id ID, namespace string) {
128136
n.subMu.Lock()
129137
defer n.subMu.Unlock()
138+
130139
if sub, found := n.inactive[id]; found {
131140
sub.namespace = namespace
132141
n.active[id] = sub
133142
delete(n.inactive, id)
143+
// Send buffered notifications.
144+
for _, data := range n.buffer[id] {
145+
n.send(sub, data)
146+
}
147+
delete(n.buffer, id)
134148
}
135149
}

rpc/subscription_test.go

Lines changed: 43 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import (
2727
)
2828

2929
type NotificationTestService struct {
30-
mu sync.Mutex
31-
unsubscribed bool
32-
30+
mu sync.Mutex
31+
unsubscribed chan string
3332
gotHangSubscriptionReq chan struct{}
3433
unblockHangSubscription chan struct{}
3534
}
@@ -38,16 +37,10 @@ func (s *NotificationTestService) Echo(i int) int {
3837
return i
3938
}
4039

41-
func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
42-
s.mu.Lock()
43-
defer s.mu.Unlock()
44-
return s.unsubscribed
45-
}
46-
4740
func (s *NotificationTestService) Unsubscribe(subid string) {
48-
s.mu.Lock()
49-
s.unsubscribed = true
50-
s.mu.Unlock()
41+
if s.unsubscribed != nil {
42+
s.unsubscribed <- subid
43+
}
5144
}
5245

5346
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
@@ -65,7 +58,6 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
6558
// test expects n events, if we begin sending event immediately some events
6659
// will probably be dropped since the subscription ID might not be send to
6760
// the client.
68-
time.Sleep(5 * time.Second)
6961
for i := 0; i < n; i++ {
7062
if err := notifier.Notify(subscription.ID, val+i); err != nil {
7163
return
@@ -74,13 +66,10 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i
7466

7567
select {
7668
case <-notifier.Closed():
77-
s.mu.Lock()
78-
s.unsubscribed = true
79-
s.mu.Unlock()
8069
case <-subscription.Err():
81-
s.mu.Lock()
82-
s.unsubscribed = true
83-
s.mu.Unlock()
70+
}
71+
if s.unsubscribed != nil {
72+
s.unsubscribed <- string(subscription.ID)
8473
}
8574
}()
8675

@@ -107,7 +96,7 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int)
10796

10897
func TestNotifications(t *testing.T) {
10998
server := NewServer()
110-
service := &NotificationTestService{}
99+
service := &NotificationTestService{unsubscribed: make(chan string)}
111100

112101
if err := server.RegisterName("eth", service); err != nil {
113102
t.Fatalf("unable to register test service %v", err)
@@ -157,10 +146,10 @@ func TestNotifications(t *testing.T) {
157146
}
158147

159148
clientConn.Close() // causes notification unsubscribe callback to be called
160-
time.Sleep(1 * time.Second)
161-
162-
if !service.wasUnsubCallbackCalled() {
163-
t.Error("unsubscribe callback not called after closing connection")
149+
select {
150+
case <-service.unsubscribed:
151+
case <-time.After(1 * time.Second):
152+
t.Fatal("Unsubscribe not called after one second")
164153
}
165154
}
166155

@@ -227,18 +216,19 @@ func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSucces
227216
// for multiple different namespaces.
228217
func TestSubscriptionMultipleNamespaces(t *testing.T) {
229218
var (
230-
namespaces = []string{"eth", "shh", "bzz"}
219+
namespaces = []string{"eth", "shh", "bzz"}
220+
service = NotificationTestService{}
221+
subCount = len(namespaces) * 2
222+
notificationCount = 3
223+
231224
server = NewServer()
232-
service = NotificationTestService{}
233225
clientConn, serverConn = net.Pipe()
234-
235-
out = json.NewEncoder(clientConn)
236-
in = json.NewDecoder(clientConn)
237-
successes = make(chan jsonSuccessResponse)
238-
failures = make(chan jsonErrResponse)
239-
notifications = make(chan jsonNotification)
240-
241-
errors = make(chan error, 10)
226+
out = json.NewEncoder(clientConn)
227+
in = json.NewDecoder(clientConn)
228+
successes = make(chan jsonSuccessResponse)
229+
failures = make(chan jsonErrResponse)
230+
notifications = make(chan jsonNotification)
231+
errors = make(chan error, 10)
242232
)
243233

244234
// setup and start server
@@ -255,13 +245,12 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
255245
go waitForMessages(t, in, successes, failures, notifications, errors)
256246

257247
// create subscriptions one by one
258-
n := 3
259248
for i, namespace := range namespaces {
260249
request := map[string]interface{}{
261250
"id": i,
262251
"method": fmt.Sprintf("%s_subscribe", namespace),
263252
"version": "2.0",
264-
"params": []interface{}{"someSubscription", n, i},
253+
"params": []interface{}{"someSubscription", notificationCount, i},
265254
}
266255

267256
if err := out.Encode(&request); err != nil {
@@ -276,7 +265,7 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
276265
"id": i,
277266
"method": fmt.Sprintf("%s_subscribe", namespace),
278267
"version": "2.0",
279-
"params": []interface{}{"someSubscription", n, i},
268+
"params": []interface{}{"someSubscription", notificationCount, i},
280269
})
281270
}
282271

@@ -285,46 +274,40 @@ func TestSubscriptionMultipleNamespaces(t *testing.T) {
285274
}
286275

287276
timeout := time.After(30 * time.Second)
288-
subids := make(map[string]string, 2*len(namespaces))
289-
count := make(map[string]int, 2*len(namespaces))
290-
291-
for {
292-
done := true
293-
for id := range count {
294-
if count, found := count[id]; !found || count < (2*n) {
277+
subids := make(map[string]string, subCount)
278+
count := make(map[string]int, subCount)
279+
allReceived := func() bool {
280+
done := len(count) == subCount
281+
for _, c := range count {
282+
if c < notificationCount {
295283
done = false
296284
}
297285
}
286+
return done
287+
}
298288

299-
if done && len(count) == len(namespaces) {
300-
break
301-
}
302-
289+
for !allReceived() {
303290
select {
304-
case err := <-errors:
305-
t.Fatal(err)
306291
case suc := <-successes: // subscription created
307292
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
293+
case notification := <-notifications:
294+
count[notification.Params.Subscription]++
295+
case err := <-errors:
296+
t.Fatal(err)
308297
case failure := <-failures:
309298
t.Errorf("received error: %v", failure.Error)
310-
case notification := <-notifications:
311-
if cnt, found := count[notification.Params.Subscription]; found {
312-
count[notification.Params.Subscription] = cnt + 1
313-
} else {
314-
count[notification.Params.Subscription] = 1
315-
}
316299
case <-timeout:
317300
for _, namespace := range namespaces {
318301
subid, found := subids[namespace]
319302
if !found {
320-
t.Errorf("Subscription for '%s' not created", namespace)
303+
t.Errorf("subscription for %q not created", namespace)
321304
continue
322305
}
323-
if count, found := count[subid]; !found || count < n {
324-
t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace)
306+
if count, found := count[subid]; !found || count < notificationCount {
307+
t.Errorf("didn't receive all notifications (%d<%d) in time for namespace %q", count, notificationCount, namespace)
325308
}
326309
}
327-
return
310+
t.Fatal("timed out")
328311
}
329312
}
330313
}

0 commit comments

Comments
 (0)