Skip to content

Commit 5ff0982

Browse files
Implement list-then-watch pattern for resilient watch restarts (issue: stale resourceVersion after 410)
Co-authored-by: brendandburns <[email protected]>
1 parent 72e81bd commit 5ff0982

File tree

2 files changed

+119
-6
lines changed

2 files changed

+119
-6
lines changed

kubernetes/base/watch/watch.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,46 @@ def stream(self, func, *args, **kwargs):
187187
disable_retries = ('timeout_seconds' in kwargs)
188188
retry_after_410 = False
189189
deserialize = kwargs.pop('deserialize', True)
190+
191+
# If no resource_version is specified for a watch (not a log follow),
192+
# perform an initial list call to get the current resourceVersion from
193+
# the list metadata. This ensures that any subsequent watch restart
194+
# after a 410 uses a valid, recent resourceVersion rather than a
195+
# potentially stale one from an individual resource event.
196+
if watch_arg == 'watch' and self.resource_version is None:
197+
list_kwargs = {k: v for k, v in kwargs.items()
198+
if k not in (watch_arg, '_preload_content',
199+
'allow_watch_bookmarks',
200+
'timeout_seconds')}
201+
initial_list = func(*args, **list_kwargs)
202+
if (hasattr(initial_list, 'metadata')
203+
and hasattr(initial_list.metadata, 'resource_version')
204+
and isinstance(
205+
initial_list.metadata.resource_version, str)
206+
and initial_list.metadata.resource_version):
207+
self.resource_version = \
208+
initial_list.metadata.resource_version
209+
kwargs['resource_version'] = self.resource_version
210+
if (hasattr(initial_list, 'items')
211+
and isinstance(initial_list.items, list)):
212+
for item in initial_list.items:
213+
raw_obj = \
214+
self._api_client.sanitize_for_serialization(item)
215+
if deserialize:
216+
yield {
217+
'type': 'ADDED',
218+
'object': item,
219+
'raw_object': raw_obj,
220+
}
221+
else:
222+
yield {
223+
'type': 'ADDED',
224+
'object': raw_obj,
225+
'raw_object': raw_obj,
226+
}
227+
if self._stop:
228+
return
229+
190230
while True:
191231
resp = func(*args, **kwargs)
192232
try:

kubernetes/base/watch/watch_test.py

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import os
1616
import time
1717
import unittest
18+
from types import SimpleNamespace
1819
from unittest.mock import Mock, call
1920

2021
from kubernetes import client, config
@@ -66,7 +67,10 @@ def test_watch_with_decode(self):
6667
# make sure that all three records were consumed by the stream
6768
self.assertEqual(4, count)
6869

69-
fake_api.get_namespaces.assert_called_once_with(
70+
# The function is called twice: first for the initial list (no watch
71+
# kwargs), then for the actual watch (with resource_version from list).
72+
self.assertEqual(fake_api.get_namespaces.call_count, 2)
73+
fake_api.get_namespaces.assert_called_with(
7074
_preload_content=False, watch=True)
7175
fake_resp.stream.assert_called_once_with(
7276
amt=None, decode_content=False)
@@ -292,6 +296,67 @@ def get_values(*args, **kwargs):
292296
# more strict test with worse error message
293297
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)
294298

299+
def test_watch_with_initial_list_resource_version(self):
300+
"""Verify the list-then-watch pattern.
301+
302+
When stream() is called without a resource_version, it should:
303+
1. Perform an initial list call to get the current resourceVersion.
304+
2. Yield items from that list as ADDED events.
305+
3. Start the watch from the list's resourceVersion so that
306+
subsequent restarts after a 410 use a valid, recent version.
307+
"""
308+
fake_resp = Mock()
309+
fake_resp.close = Mock()
310+
fake_resp.release_conn = Mock()
311+
# The watch stream returns one new event after the initial list.
312+
fake_resp.stream = Mock(
313+
return_value=[
314+
'{"type": "MODIFIED", "object": {"metadata": '
315+
'{"name": "ns-new", "resourceVersion": "200"}, '
316+
'"spec": {}, "status": {}}}\n',
317+
])
318+
319+
# Build a real-ish list response with two existing namespaces.
320+
ns1 = client.V1Namespace(
321+
metadata=client.V1ObjectMeta(
322+
name="ns-one", resource_version="100"))
323+
ns2 = client.V1Namespace(
324+
metadata=client.V1ObjectMeta(
325+
name="ns-two", resource_version="150"))
326+
fake_list = client.V1NamespaceList(
327+
metadata=client.V1ListMeta(resource_version="180"),
328+
items=[ns1, ns2])
329+
330+
def _list_or_watch(*args, **kwargs):
331+
return fake_resp if kwargs.get('watch') else fake_list
332+
333+
fake_api = Mock()
334+
fake_api.list_namespaces = Mock(side_effect=_list_or_watch)
335+
fake_api.list_namespaces.__doc__ = ':return: V1NamespaceList'
336+
337+
w = Watch()
338+
events = []
339+
for e in w.stream(fake_api.list_namespaces, timeout_seconds=1):
340+
events.append(e)
341+
342+
# The two existing namespaces must appear first as ADDED events.
343+
self.assertEqual(len(events), 3)
344+
self.assertEqual(events[0]['type'], 'ADDED')
345+
self.assertEqual(events[0]['object'].metadata.name, 'ns-one')
346+
self.assertEqual(events[1]['type'], 'ADDED')
347+
self.assertEqual(events[1]['object'].metadata.name, 'ns-two')
348+
# The new event from the watch stream follows.
349+
self.assertEqual(events[2]['type'], 'MODIFIED')
350+
self.assertEqual(events[2]['object'].metadata.name, 'ns-new')
351+
352+
# The watch must have started from the list's resourceVersion.
353+
fake_api.list_namespaces.assert_has_calls([
354+
call(),
355+
call(_preload_content=False, watch=True,
356+
timeout_seconds=1, resource_version="180"),
357+
])
358+
self.assertEqual(w.resource_version, "200")
359+
295360
def test_watch_stream_twice(self):
296361
w = Watch(float)
297362
for step in ['first', 'second']:
@@ -312,7 +377,10 @@ def test_watch_stream_twice(self):
312377
w.stop()
313378

314379
self.assertEqual(count, 3)
315-
fake_api.get_namespaces.assert_called_once_with(
380+
# The function is called twice per stream() invocation: once for
381+
# the initial list call and once for the actual watch call.
382+
self.assertEqual(fake_api.get_namespaces.call_count, 2)
383+
fake_api.get_namespaces.assert_called_with(
316384
_preload_content=False, watch=True)
317385
fake_resp.stream.assert_called_once_with(
318386
amt=None, decode_content=False)
@@ -346,7 +414,9 @@ def test_watch_stream_loop(self):
346414
w.stop()
347415

348416
self.assertEqual(count, 2)
349-
self.assertEqual(fake_api.get_namespaces.call_count, 2)
417+
# Each stream() call makes 2 API calls: initial list + watch.
418+
# Two stream() calls = 4 total API calls.
419+
self.assertEqual(fake_api.get_namespaces.call_count, 4)
350420
self.assertEqual(fake_resp.stream.call_count, 2)
351421
self.assertEqual(fake_resp.close.call_count, 2)
352422
self.assertEqual(fake_resp.release_conn.call_count, 2)
@@ -423,8 +493,9 @@ def test_watch_with_exception(self):
423493
pass
424494
# expected
425495

426-
fake_api.get_thing.assert_called_once_with(
496+
fake_api.get_thing.assert_called_with(
427497
_preload_content=False, watch=True)
498+
self.assertEqual(fake_api.get_thing.call_count, 2)
428499
fake_resp.stream.assert_called_once_with(
429500
amt=None, decode_content=False)
430501
fake_resp.close.assert_called_once()
@@ -447,8 +518,9 @@ def test_watch_with_error_event(self):
447518
# No retry is attempted either, preventing an ApiException
448519
assert not list(w.stream(fake_api.get_thing))
449520

450-
fake_api.get_thing.assert_called_once_with(
521+
fake_api.get_thing.assert_called_with(
451522
_preload_content=False, watch=True)
523+
self.assertEqual(fake_api.get_thing.call_count, 2)
452524
fake_resp.stream.assert_called_once_with(
453525
amt=None, decode_content=False)
454526
fake_resp.close.assert_called_once()
@@ -500,8 +572,9 @@ def test_watch_with_error_event_and_timeout_param(self):
500572
except client.rest.ApiException:
501573
pass
502574

503-
fake_api.get_thing.assert_called_once_with(
575+
fake_api.get_thing.assert_called_with(
504576
_preload_content=False, watch=True, timeout_seconds=10)
577+
self.assertEqual(fake_api.get_thing.call_count, 2)
505578
fake_resp.stream.assert_called_once_with(
506579
amt=None, decode_content=False)
507580
fake_resp.close.assert_called_once()

0 commit comments

Comments
 (0)