Skip to content

Commit 579b22c

Browse files
committed
Fix progress notification for watch that doesn't get any events
When implementing the fix for progress notifications (#15237) we made a incorrect assumption that that unsynched watches will always get at least one event. Unsynched watches include not only slow watchers, but also newly created watches that requested current or older revision. In case that non of the events match watch filter, those newly created watches might become synched without any event going through. Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent d0a0281 commit 579b22c

2 files changed

Lines changed: 50 additions & 23 deletions

File tree

server/etcdserver/api/v3rpc/watch.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,6 @@ type serverWatchStream struct {
144144
// records fragmented watch IDs
145145
fragment map[mvcc.WatchID]bool
146146

147-
// indicates whether we have an outstanding global progress
148-
// notification to send
149-
deferredProgress bool
150-
151147
// closec indicates the stream is closed.
152148
closec chan struct{}
153149

@@ -177,8 +173,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
177173
prevKV: make(map[mvcc.WatchID]bool),
178174
fragment: make(map[mvcc.WatchID]bool),
179175

180-
deferredProgress: false,
181-
182176
closec: make(chan struct{}),
183177
}
184178

@@ -366,14 +360,7 @@ func (sws *serverWatchStream) recvLoop() error {
366360
case *pb.WatchRequest_ProgressRequest:
367361
if uv.ProgressRequest != nil {
368362
sws.mu.Lock()
369-
// Ignore if deferred progress notification is already in progress
370-
if !sws.deferredProgress {
371-
// Request progress for all watchers,
372-
// force generation of a response
373-
if !sws.watchStream.RequestProgressAll() {
374-
sws.deferredProgress = true
375-
}
376-
}
363+
sws.watchStream.RequestProgressAll()
377364
sws.mu.Unlock()
378365
}
379366
default:
@@ -481,11 +468,6 @@ func (sws *serverWatchStream) sendLoop() {
481468
// elide next progress update if sent a key update
482469
sws.progress[wresp.WatchID] = false
483470
}
484-
if sws.deferredProgress {
485-
if sws.watchStream.RequestProgressAll() {
486-
sws.deferredProgress = false
487-
}
488-
}
489471
sws.mu.Unlock()
490472

491473
case c, ok := <-sws.ctrlStream:

tests/integration/v3_watch_test.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,8 +1438,8 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14381438
wch := client.Watch(ctx, "foo", clientv3.WithRev(1))
14391439

14401440
// Immediately request a progress notification. As the client
1441-
// is unsynchronised, the server will have to defer the
1442-
// notification internally.
1441+
// is unsynchronised, the server will not sent any notification,
1442+
//as client can infer progress from events.
14431443
err := client.RequestProgress(ctx)
14441444
require.NoError(t, err)
14451445

@@ -1459,8 +1459,9 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14591459
}
14601460
event_count += len(wr.Events)
14611461
}
1462-
1463-
// ... followed by the requested progress notification
1462+
// client needs to request progress notification again
1463+
err = client.RequestProgress(ctx)
1464+
require.NoError(t, err)
14641465
wr2 := <-wch
14651466
if wr2.Err() != nil {
14661467
t.Fatal(fmt.Errorf("watch error: %w", wr2.Err()))
@@ -1472,3 +1473,47 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14721473
t.Fatal("Wrong revision in progress notification!")
14731474
}
14741475
}
1476+
1477+
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
1478+
if ThroughProxy {
1479+
t.Skip("grpc proxy currently does not support requesting progress notifications")
1480+
}
1481+
BeforeTest(t)
1482+
1483+
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
1484+
defer clus.Terminate(t)
1485+
1486+
client := clus.RandClient()
1487+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1488+
defer cancel()
1489+
1490+
resp, err := client.Put(ctx, "bar", "1")
1491+
require.NoError(t, err)
1492+
1493+
wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision))
1494+
// Request the progress notification on newly created watch that was not yet synced.
1495+
err = client.RequestProgress(ctx)
1496+
ticker := time.NewTicker(100 * time.Millisecond)
1497+
defer ticker.Stop()
1498+
1499+
require.NoError(t, err)
1500+
gotProgressNotification := false
1501+
for {
1502+
select {
1503+
case <-ticker.C:
1504+
err := client.RequestProgress(ctx)
1505+
require.NoError(t, err)
1506+
case resp := <-wch:
1507+
if resp.Err() != nil {
1508+
t.Fatal(fmt.Errorf("watch error: %w", resp.Err()))
1509+
}
1510+
if resp.IsProgressNotify() {
1511+
gotProgressNotification = true
1512+
}
1513+
}
1514+
if gotProgressNotification {
1515+
break
1516+
}
1517+
}
1518+
require.True(t, gotProgressNotification, "Expected to get progress notification")
1519+
}

0 commit comments

Comments
 (0)