@@ -14672,7 +14672,7 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emi
1467214672 public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1467314673 Objects.requireNonNull(unit, "unit is null");
1467414674 Objects.requireNonNull(scheduler, "scheduler is null");
14675- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
14675+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null ));
1467614676 }
1467714677
1467814678 /**
@@ -14713,7 +14713,51 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
1471314713 public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
1471414714 Objects.requireNonNull(unit, "unit is null");
1471514715 Objects.requireNonNull(scheduler, "scheduler is null");
14716- return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
14716+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
14717+ }
14718+
14719+ /**
14720+ * Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14721+ * within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
14722+ * and optionally emit the very last upstream item when the upstream completes.
14723+ * <p>
14724+ * <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
14725+ * <dl>
14726+ * <dt><b>Backpressure:</b></dt>
14727+ * <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14728+ * <dt><b>Scheduler:</b></dt>
14729+ * <dd>You specify which {@code Scheduler} this operator will use.</dd>
14730+ * </dl>
14731+ *
14732+ * <p>History: 2.0.5 - experimental
14733+ * @param period
14734+ * the sampling rate
14735+ * @param unit
14736+ * the {@link TimeUnit} in which {@code period} is defined
14737+ * @param scheduler
14738+ * the {@code Scheduler} to use when sampling
14739+ * @param emitLast
14740+ * if {@code true} and the upstream completes while there is still an unsampled item available,
14741+ * that item is emitted to downstream before completion
14742+ * if {@code false}, an unsampled last item is ignored.
14743+ * @param onDropped
14744+ * called with the current entry when it has been replaced by a new one
14745+ * @return the new {@code Flowable} instance
14746+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
14747+ * @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14748+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14749+ * @see #throttleLast(long, TimeUnit, Scheduler)
14750+ * @since 2.1
14751+ */
14752+ @CheckReturnValue
14753+ @NonNull
14754+ @BackpressureSupport(BackpressureKind.ERROR)
14755+ @SchedulerSupport(SchedulerSupport.CUSTOM)
14756+ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<T> onDropped) {
14757+ Objects.requireNonNull(unit, "unit is null");
14758+ Objects.requireNonNull(scheduler, "scheduler is null");
14759+ Objects.requireNonNull(onDropped, "onDropped is null");
14760+ return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
1471714761 }
1471814762
1471914763 /**
@@ -17211,6 +17255,45 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
1721117255 return sample(intervalDuration, unit, scheduler);
1721217256 }
1721317257
17258+ /**
17259+ * Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
17260+ * time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
17261+ * <p>
17262+ * This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
17263+ * {@code throttleFirst} does not tick, it just tracks the passage of time.
17264+ * <p>
17265+ * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
17266+ * <dl>
17267+ * <dt><b>Backpressure:</b></dt>
17268+ * <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17269+ * <dt><b>Scheduler:</b></dt>
17270+ * <dd>You specify which {@code Scheduler} this operator will use.</dd>
17271+ * </dl>
17272+ *
17273+ * @param intervalDuration
17274+ * duration of windows within which the last item emitted by the current {@code Flowable} will be
17275+ * emitted
17276+ * @param unit
17277+ * the unit of time of {@code intervalDuration}
17278+ * @param scheduler
17279+ * the {@code Scheduler} to use internally to manage the timers that handle timeout for each
17280+ * event
17281+ * @param onDropped
17282+ * called with the current entry when it has been replaced by a new one
17283+ * @return the new {@code Flowable} instance
17284+ * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
17285+ * @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17286+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17287+ * @see #sample(long, TimeUnit, Scheduler)
17288+ */
17289+ @CheckReturnValue
17290+ @BackpressureSupport(BackpressureKind.ERROR)
17291+ @SchedulerSupport(SchedulerSupport.CUSTOM)
17292+ @NonNull
17293+ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
17294+ return sample(intervalDuration, unit, scheduler, false, onDropped);
17295+ }
17296+
1721417297 /**
1721517298 * Throttles items from the upstream {@code Flowable} by first emitting the next
1721617299 * item from upstream, then periodically emitting the latest item (if any) when
0 commit comments