Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 84 additions & 14 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::{reflector::ObjectRef, watcher::Error};
use crate::watcher::Error;
use core::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::Stream;
use kube_client::Resource;
use kube_client::{api::ObjectMeta, Resource};
use pin_project::pin_project;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
marker::PhantomData,
};

fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
Expand All @@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
hasher.finish()
}

/// Private cache key that includes UID in equality for predicate filtering
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PredicateCacheKey {
name: String,
namespace: Option<String>,
uid: Option<String>,
}

impl From<&ObjectMeta> for PredicateCacheKey {
fn from(meta: &ObjectMeta) -> Self {
Self {
name: meta.name.clone().unwrap_or_default(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
}
}
}

/// A predicate is a hasher of Kubernetes objects stream filtering
pub trait Predicate<K> {
/// A predicate only needs to implement optional hashing when keys exist
Expand Down Expand Up @@ -103,7 +122,9 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<ObjectRef<K>, u64>,
cache: HashMap<PredicateCacheKey, u64>,
// K: Resource necessary to get .meta() of an object during polling
_phantom: PhantomData<K>,
Comment thread
clux marked this conversation as resolved.
}
impl<St, K, P> PredicateFilter<St, K, P>
where
Expand All @@ -116,6 +137,7 @@ where
stream,
predicate,
cache: HashMap::new(),
_phantom: PhantomData,
}
}
}
Expand All @@ -134,17 +156,9 @@ where
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
} else {
true
};
if let Some(old) = me.cache.get_mut(&key) {
*old = val;
} else {
me.cache.insert(key, val);
}
let key = PredicateCacheKey::from(obj.meta());
let changed = me.cache.get(&key) != Some(&val);
me.cache.insert(key, val);
Comment thread
clux marked this conversation as resolved.
if changed {
Some(Ok(obj))
} else {
Expand Down Expand Up @@ -251,4 +265,60 @@ pub(crate) mod tests {
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}

#[tokio::test]
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
use k8s_openapi::api::core::v1::Pod;

let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};

// Simulate: create (gen=1, uid=1) -> update (gen=1, uid=1) -> delete -> create (gen=1, uid=2) -> delete -> create (gen=2, uid-3)
let data = stream::iter([
Ok(mkobj(1, "uid-1")), // First resource created, generation=1
Ok(mkobj(1, "uid-1")), // Same resource, same generation (should be filtered out)
Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID
Comment thread
clux marked this conversation as resolved.
Outdated
Ok(mkobj(2, "uid-3")), // Resource recreated again with new generation and different UID
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));

// First object should pass through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));

// Second object (same UID, same generation) should be filtered out - no event
// Third object should pass through because it's a different resource
// (different UID), even though it has the same generation
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));

// Fourth object should also pass through because it's a different resource
Comment thread
clux marked this conversation as resolved.
Outdated
// (different UID and generation)
let third = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(third.meta().generation, Some(2));
assert_eq!(third.meta().uid.as_deref(), Some("uid-3"));

// Stream should be exhausted
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}