Skip to content

Commit 84985b4

Browse files
authored
Ensure watch_object handles objects removed before init (#1577)
* Stream produced by `watch_object` will include an item when the object isn't in any initial list fixes #1576 Signed-off-by: Mark Ingram <[email protected]> * Stream produced by `watch_object` will include an item when the object isn't in any initial list - added a comment - resolved a nightly clippy - use a match guard instead of an nested if Signed-off-by: Mark Ingram <[email protected]> --------- Signed-off-by: Mark Ingram <[email protected]>
1 parent 63644d2 commit 84985b4

File tree

1 file changed

+31
-13
lines changed

1 file changed

+31
-13
lines changed

kube-runtime/src/watcher.rs

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use kube_client::{
1414
Api, Error as ClientErr,
1515
};
1616
use serde::de::DeserializeOwned;
17-
use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration};
17+
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
1818
use thiserror::Error;
1919
use tracing::{debug, error, warn};
2020

@@ -844,18 +844,36 @@ pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'sta
844844
// filtering by object name in given scope, so there's at most one matching object
845845
// footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
846846
let fields = format!("metadata.name={name}");
847-
watcher(api, Config::default().fields(&fields)).filter_map(|event| async {
848-
match event {
849-
// Pass up `Some` for Found / Updated
850-
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
851-
// Pass up `None` for Deleted
852-
Ok(Event::Delete(_)) => Some(Ok(None)),
853-
// Ignore marker events
854-
Ok(Event::Init | Event::InitDone) => None,
855-
// Bubble up errors
856-
Err(err) => Some(Err(err)),
857-
}
858-
})
847+
watcher(api, Config::default().fields(&fields))
848+
// The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
849+
// sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
850+
// `None` is emitted when the InitDone event is received.
851+
//
852+
// The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
853+
// checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
854+
// could happen given K8S events aren't guaranteed delivery.
855+
.scan(false, |obj_seen, event| {
856+
if matches!(event, Ok(Event::Init)) {
857+
*obj_seen = false;
858+
} else if matches!(event, Ok(Event::InitApply(_))) {
859+
*obj_seen = true;
860+
}
861+
future::ready(Some((*obj_seen, event)))
862+
})
863+
.filter_map(|(obj_seen, event)| async move {
864+
match event {
865+
// Pass up `Some` for Found / Updated
866+
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
867+
// Pass up `None` for Deleted
868+
Ok(Event::Delete(_)) => Some(Ok(None)),
869+
// Pass up `None` if the object wasn't seen in the initial list
870+
Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
871+
// Ignore marker events
872+
Ok(Event::Init | Event::InitDone) => None,
873+
// Bubble up errors
874+
Err(err) => Some(Err(err)),
875+
}
876+
})
859877
}
860878

861879
/// Default watcher backoff inspired by Kubernetes' client-go.

0 commit comments

Comments
 (0)