Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 74 additions & 14 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::{reflector::ObjectRef, watcher::Error};
use crate::watcher::Error;
use core::{
pin::Pin,
task::{ready, Context, Poll},
};
use futures::Stream;
use kube_client::Resource;
use kube_client::{api::ObjectMeta, Resource};
use pin_project::pin_project;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
marker::PhantomData,
};

fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
Expand All @@ -17,6 +18,24 @@ fn hash<T: Hash + ?Sized>(t: &T) -> u64 {
hasher.finish()
}

/// Private cache key that includes UID in equality for predicate filtering
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct PredicateCacheKey {
name: String,
namespace: Option<String>,
uid: Option<String>,
}

impl From<&ObjectMeta> for PredicateCacheKey {
fn from(meta: &ObjectMeta) -> Self {
Self {
name: meta.name.clone().unwrap_or_default(),
namespace: meta.namespace.clone(),
uid: meta.uid.clone(),
}
}
}

/// A predicate is a hasher of Kubernetes objects stream filtering
pub trait Predicate<K> {
/// A predicate only needs to implement optional hashing when keys exist
Expand Down Expand Up @@ -103,7 +122,8 @@ pub struct PredicateFilter<St, K: Resource, P: Predicate<K>> {
#[pin]
stream: St,
predicate: P,
cache: HashMap<ObjectRef<K>, u64>,
cache: HashMap<PredicateCacheKey, u64>,
_phantom: PhantomData<K>,
Comment thread
clux marked this conversation as resolved.
}
impl<St, K, P> PredicateFilter<St, K, P>
where
Expand All @@ -116,6 +136,7 @@ where
stream,
predicate,
cache: HashMap::new(),
_phantom: PhantomData,
}
}
}
Expand All @@ -134,17 +155,9 @@ where
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = me.predicate.hash_property(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
} else {
true
};
if let Some(old) = me.cache.get_mut(&key) {
*old = val;
} else {
me.cache.insert(key, val);
}
let key = PredicateCacheKey::from(obj.meta());
let changed = me.cache.get(&key) != Some(&val);
me.cache.insert(key, val);
Comment thread
clux marked this conversation as resolved.
if changed {
Some(Ok(obj))
} else {
Expand Down Expand Up @@ -251,4 +264,51 @@ pub(crate) mod tests {
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}

#[tokio::test]
async fn predicate_filtering_should_handle_recreated_resources_with_same_generation() {
use k8s_openapi::api::core::v1::Pod;

let mkobj = |g: i32, uid: &str| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"namespace": "default",
"generation": Some(g),
"uid": uid,
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};

// Simulate: create (gen=1, uid=1) -> delete -> create (gen=1, uid=2)
let data = stream::iter([
Ok(mkobj(1, "uid-1")), // First resource created, generation=1
Ok(mkobj(1, "uid-2")), // Resource recreated with same generation but different UID
Comment thread
clux marked this conversation as resolved.
Outdated
]);
let mut rx = pin!(PredicateFilter::new(data, predicates::generation));

// First object should pass through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));
assert_eq!(first.meta().uid.as_deref(), Some("uid-1"));

// Second object should also pass through because it's a different resource
// (different UID), even though it has the same generation
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(1));
assert_eq!(second.meta().uid.as_deref(), Some("uid-2"));

// Stream should be exhausted
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}