diff --git a/futures/src/subscription.rs b/futures/src/subscription.rs index 58317a7315..cb5d811eb5 100644 --- a/futures/src/subscription.rs +++ b/futures/src/subscription.rs @@ -341,9 +341,7 @@ impl Subscription { ) -> BoxStream { use futures::StreamExt; - let mapper = self.mapper; - - Box::pin(self.recipe.stream(input).map(mapper)) + Box::pin(self.recipe.stream(input).map(self.mapper)) } } @@ -361,6 +359,76 @@ impl Subscription { } } + /// Transforms the [`Subscription`] output with the given function, yielding only + /// values only when the function returns `Some(A)`. + /// + /// # Panics + /// The closure provided must be a non-capturing closure. The method + /// will panic in debug mode otherwise. + pub fn filter_map(mut self, f: F) -> Subscription + where + T: MaybeSend + 'static, + F: Fn(T) -> Option + MaybeSend + Clone + 'static, + A: MaybeSend + 'static, + { + debug_assert!( + std::mem::size_of::() == 0, + "the closure {} provided in `Subscription::filter_map` is capturing", + std::any::type_name::(), + ); + + struct FilterMap + where + F: Fn(A) -> Option + 'static, + { + recipe: Box>, + mapper: F, + } + + impl Recipe for FilterMap + where + A: 'static, + B: 'static + MaybeSend, + F: Fn(A) -> Option + MaybeSend, + { + type Output = B; + + fn hash(&self, state: &mut Hasher) { + TypeId::of::().hash(state); + self.recipe.hash(state); + } + + fn stream( + self: Box, + input: EventStream, + ) -> BoxStream { + use futures::StreamExt; + use futures::future; + + let mapper = self.mapper; + + Box::pin( + self.recipe + .stream(input) + .filter_map(move |a| future::ready(mapper(a))), + ) + } + } + + Subscription { + recipes: self + .recipes + .drain(..) + .map(|recipe| { + Box::new(FilterMap { + recipe, + mapper: f.clone(), + }) as Box> + }) + .collect(), + } + } + /// Returns the amount of recipe units in this [`Subscription`]. pub fn units(&self) -> usize { self.recipes.len()