Skip to content

Commit 24218c5

Browse files
watsaighecrj
authored andcommitted
Add filter_map method to Subscription
1 parent 7c3bdcc commit 24218c5

File tree

1 file changed

+72
-4
lines changed

1 file changed

+72
-4
lines changed

futures/src/subscription.rs

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::core::event;
77
use crate::core::theme;
88
use crate::core::window;
99
use crate::futures::Stream;
10-
use crate::{BoxStream, MaybeSend};
10+
use crate::{BoxStream, MaybeSend, MaybeSync};
1111

1212
use std::any::TypeId;
1313
use std::hash::Hash;
@@ -341,9 +341,7 @@ impl<T> Subscription<T> {
341341
) -> BoxStream<Self::Output> {
342342
use futures::StreamExt;
343343

344-
let mapper = self.mapper;
345-
346-
Box::pin(self.recipe.stream(input).map(mapper))
344+
Box::pin(self.recipe.stream(input).map(self.mapper))
347345
}
348346
}
349347

@@ -361,6 +359,76 @@ impl<T> Subscription<T> {
361359
}
362360
}
363361

362+
/// Transforms the [`Subscription`] output with the given function, yielding only
363+
/// values only when the function returns `Some(A)`.
364+
///
365+
/// # Panics
366+
/// The closure provided must be a non-capturing closure. The method
367+
/// will panic in debug mode otherwise.
368+
pub fn filter_map<F, A>(mut self, f: F) -> Subscription<A>
369+
where
370+
T: MaybeSend + 'static,
371+
F: Fn(T) -> Option<A> + MaybeSend + MaybeSync + Clone + 'static,
372+
A: MaybeSend + 'static,
373+
{
374+
debug_assert!(
375+
std::mem::size_of::<F>() == 0,
376+
"the closure {} provided in `Subscription::filter_map` is capturing",
377+
std::any::type_name::<F>(),
378+
);
379+
380+
struct FilterMap<A, B, F>
381+
where
382+
F: Fn(A) -> Option<B> + 'static,
383+
{
384+
recipe: Box<dyn Recipe<Output = A>>,
385+
mapper: F,
386+
}
387+
388+
impl<A, B, F> Recipe for FilterMap<A, B, F>
389+
where
390+
A: 'static,
391+
B: 'static + MaybeSend,
392+
F: Fn(A) -> Option<B> + MaybeSend,
393+
{
394+
type Output = B;
395+
396+
fn hash(&self, state: &mut Hasher) {
397+
TypeId::of::<F>().hash(state);
398+
self.recipe.hash(state);
399+
}
400+
401+
fn stream(
402+
self: Box<Self>,
403+
input: EventStream,
404+
) -> BoxStream<Self::Output> {
405+
use futures::StreamExt;
406+
use futures::future;
407+
408+
let mapper = self.mapper;
409+
410+
Box::pin(
411+
self.recipe
412+
.stream(input)
413+
.filter_map(move |a| future::ready(mapper(a))),
414+
)
415+
}
416+
}
417+
418+
Subscription {
419+
recipes: self
420+
.recipes
421+
.drain(..)
422+
.map(|recipe| {
423+
Box::new(FilterMap {
424+
recipe,
425+
mapper: f.clone(),
426+
}) as Box<dyn Recipe<Output = A>>
427+
})
428+
.collect(),
429+
}
430+
}
431+
364432
/// Returns the amount of recipe units in this [`Subscription`].
365433
pub fn units(&self) -> usize {
366434
self.recipes.len()

0 commit comments

Comments
 (0)