Skip to content

Commit 7efb803

Browse files
committed
feat(delay): add higher-order lettable version of delay
1 parent df0d439 commit 7efb803

File tree

3 files changed

+152
-102
lines changed

3 files changed

+152
-102
lines changed

src/operator/delay.ts

Lines changed: 2 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
import { async } from '../scheduler/async';
2-
import { isDate } from '../util/isDate';
3-
import { Operator } from '../Operator';
42
import { IScheduler } from '../Scheduler';
5-
import { Subscriber } from '../Subscriber';
6-
import { Action } from '../scheduler/Action';
7-
import { Notification } from '../Notification';
83
import { Observable } from '../Observable';
9-
import { PartialObserver } from '../Observer';
10-
import { TeardownLogic } from '../Subscription';
4+
import { delay as higherOrder } from '../operators';
115

126
/**
137
* Delays the emission of items from the source Observable by a given timeout or
@@ -50,99 +44,5 @@ import { TeardownLogic } from '../Subscription';
5044
*/
5145
export function delay<T>(this: Observable<T>, delay: number|Date,
5246
scheduler: IScheduler = async): Observable<T> {
53-
const absoluteDelay = isDate(delay);
54-
const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(<number>delay);
55-
return this.lift(new DelayOperator(delayFor, scheduler));
56-
}
57-
58-
class DelayOperator<T> implements Operator<T, T> {
59-
constructor(private delay: number,
60-
private scheduler: IScheduler) {
61-
}
62-
63-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
64-
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
65-
}
66-
}
67-
68-
interface DelayState<T> {
69-
source: DelaySubscriber<T>;
70-
destination: PartialObserver<T>;
71-
scheduler: IScheduler;
72-
}
73-
74-
/**
75-
* We need this JSDoc comment for affecting ESDoc.
76-
* @ignore
77-
* @extends {Ignored}
78-
*/
79-
class DelaySubscriber<T> extends Subscriber<T> {
80-
private queue: Array<DelayMessage<T>> = [];
81-
private active: boolean = false;
82-
private errored: boolean = false;
83-
84-
private static dispatch<T>(this: Action<DelayState<T>>, state: DelayState<T>): void {
85-
const source = state.source;
86-
const queue = source.queue;
87-
const scheduler = state.scheduler;
88-
const destination = state.destination;
89-
90-
while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
91-
queue.shift().notification.observe(destination);
92-
}
93-
94-
if (queue.length > 0) {
95-
const delay = Math.max(0, queue[0].time - scheduler.now());
96-
this.schedule(state, delay);
97-
} else {
98-
source.active = false;
99-
}
100-
}
101-
102-
constructor(destination: Subscriber<T>,
103-
private delay: number,
104-
private scheduler: IScheduler) {
105-
super(destination);
106-
}
107-
108-
private _schedule(scheduler: IScheduler): void {
109-
this.active = true;
110-
this.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
111-
source: this, destination: this.destination, scheduler: scheduler
112-
}));
113-
}
114-
115-
private scheduleNotification(notification: Notification<T>): void {
116-
if (this.errored === true) {
117-
return;
118-
}
119-
120-
const scheduler = this.scheduler;
121-
const message = new DelayMessage(scheduler.now() + this.delay, notification);
122-
this.queue.push(message);
123-
124-
if (this.active === false) {
125-
this._schedule(scheduler);
126-
}
127-
}
128-
129-
protected _next(value: T) {
130-
this.scheduleNotification(Notification.createNext(value));
131-
}
132-
133-
protected _error(err: any) {
134-
this.errored = true;
135-
this.queue = [];
136-
this.destination.error(err);
137-
}
138-
139-
protected _complete() {
140-
this.scheduleNotification(Notification.createComplete());
141-
}
142-
}
143-
144-
class DelayMessage<T> {
145-
constructor(public readonly time: number,
146-
public readonly notification: Notification<T>) {
147-
}
47+
return higherOrder<T>(delay, scheduler)(this);
14848
}

src/operators/delay.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import { async } from '../scheduler/async';
2+
import { isDate } from '../util/isDate';
3+
import { Operator } from '../Operator';
4+
import { IScheduler } from '../Scheduler';
5+
import { Subscriber } from '../Subscriber';
6+
import { Action } from '../scheduler/Action';
7+
import { Notification } from '../Notification';
8+
import { Observable } from '../Observable';
9+
import { PartialObserver } from '../Observer';
10+
import { TeardownLogic } from '../Subscription';
11+
import { MonoTypeOperatorFunction } from '../interfaces';
12+
13+
/**
14+
* Delays the emission of items from the source Observable by a given timeout or
15+
* until a given Date.
16+
*
17+
* <span class="informal">Time shifts each item by some specified amount of
18+
* milliseconds.</span>
19+
*
20+
* <img src="./img/delay.png" width="100%">
21+
*
22+
* If the delay argument is a Number, this operator time shifts the source
23+
* Observable by that amount of time expressed in milliseconds. The relative
24+
* time intervals between the values are preserved.
25+
*
26+
* If the delay argument is a Date, this operator time shifts the start of the
27+
* Observable execution until the given date occurs.
28+
*
29+
* @example <caption>Delay each click by one second</caption>
30+
* var clicks = Rx.Observable.fromEvent(document, 'click');
31+
* var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
32+
* delayedClicks.subscribe(x => console.log(x));
33+
*
34+
* @example <caption>Delay all clicks until a future date happens</caption>
35+
* var clicks = Rx.Observable.fromEvent(document, 'click');
36+
* var date = new Date('March 15, 2050 12:00:00'); // in the future
37+
* var delayedClicks = clicks.delay(date); // click emitted only after that date
38+
* delayedClicks.subscribe(x => console.log(x));
39+
*
40+
* @see {@link debounceTime}
41+
* @see {@link delayWhen}
42+
*
43+
* @param {number|Date} delay The delay duration in milliseconds (a `number`) or
44+
* a `Date` until which the emission of the source items is delayed.
45+
* @param {Scheduler} [scheduler=async] The IScheduler to use for
46+
* managing the timers that handle the time-shift for each item.
47+
* @return {Observable} An Observable that delays the emissions of the source
48+
* Observable by the specified timeout or Date.
49+
* @method delay
50+
* @owner Observable
51+
*/
52+
export function delay<T>(delay: number|Date,
53+
scheduler: IScheduler = async): MonoTypeOperatorFunction<T> {
54+
const absoluteDelay = isDate(delay);
55+
const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(<number>delay);
56+
return (source: Observable<T>) => source.lift(new DelayOperator(delayFor, scheduler));
57+
}
58+
59+
class DelayOperator<T> implements Operator<T, T> {
60+
constructor(private delay: number,
61+
private scheduler: IScheduler) {
62+
}
63+
64+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
65+
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
66+
}
67+
}
68+
69+
interface DelayState<T> {
70+
source: DelaySubscriber<T>;
71+
destination: PartialObserver<T>;
72+
scheduler: IScheduler;
73+
}
74+
75+
/**
76+
* We need this JSDoc comment for affecting ESDoc.
77+
* @ignore
78+
* @extends {Ignored}
79+
*/
80+
class DelaySubscriber<T> extends Subscriber<T> {
81+
private queue: Array<DelayMessage<T>> = [];
82+
private active: boolean = false;
83+
private errored: boolean = false;
84+
85+
private static dispatch<T>(this: Action<DelayState<T>>, state: DelayState<T>): void {
86+
const source = state.source;
87+
const queue = source.queue;
88+
const scheduler = state.scheduler;
89+
const destination = state.destination;
90+
91+
while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
92+
queue.shift().notification.observe(destination);
93+
}
94+
95+
if (queue.length > 0) {
96+
const delay = Math.max(0, queue[0].time - scheduler.now());
97+
this.schedule(state, delay);
98+
} else {
99+
source.active = false;
100+
}
101+
}
102+
103+
constructor(destination: Subscriber<T>,
104+
private delay: number,
105+
private scheduler: IScheduler) {
106+
super(destination);
107+
}
108+
109+
private _schedule(scheduler: IScheduler): void {
110+
this.active = true;
111+
this.add(scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch, this.delay, {
112+
source: this, destination: this.destination, scheduler: scheduler
113+
}));
114+
}
115+
116+
private scheduleNotification(notification: Notification<T>): void {
117+
if (this.errored === true) {
118+
return;
119+
}
120+
121+
const scheduler = this.scheduler;
122+
const message = new DelayMessage(scheduler.now() + this.delay, notification);
123+
this.queue.push(message);
124+
125+
if (this.active === false) {
126+
this._schedule(scheduler);
127+
}
128+
}
129+
130+
protected _next(value: T) {
131+
this.scheduleNotification(Notification.createNext(value));
132+
}
133+
134+
protected _error(err: any) {
135+
this.errored = true;
136+
this.queue = [];
137+
this.destination.error(err);
138+
}
139+
140+
protected _complete() {
141+
this.scheduleNotification(Notification.createComplete());
142+
}
143+
}
144+
145+
class DelayMessage<T> {
146+
constructor(public readonly time: number,
147+
public readonly notification: Notification<T>) {
148+
}
149+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export { count } from './count';
1414
export { debounce } from './debounce';
1515
export { debounceTime } from './debounceTime';
1616
export { defaultIfEmpty } from './defaultIfEmpty';
17+
export { delay } from './delay';
1718
export { dematerialize } from './dematerialize';
1819
export { filter } from './filter';
1920
export { ignoreElements } from './ignoreElements';

0 commit comments

Comments
 (0)