Skip to content

Commit 1d1cecd

Browse files
committed
feat(repeatWhen): add higher-order lettable version of repeatWhen
1 parent 8473fe5 commit 1d1cecd

File tree

3 files changed

+125
-102
lines changed

3 files changed

+125
-102
lines changed

src/operator/repeatWhen.ts

Lines changed: 2 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,5 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
31
import { Observable } from '../Observable';
4-
import { Subject } from '../Subject';
5-
import { Subscription, TeardownLogic } from '../Subscription';
6-
import { tryCatch } from '../util/tryCatch';
7-
import { errorObject } from '../util/errorObject';
8-
9-
import { OuterSubscriber } from '../OuterSubscriber';
10-
import { InnerSubscriber } from '../InnerSubscriber';
11-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { repeatWhen as higherOrder } from '../operators/repeatWhen';
123

134
/**
145
* Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source
@@ -25,96 +16,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
2516
* @owner Observable
2617
*/
2718
export function repeatWhen<T>(this: Observable<T>, notifier: (notifications: Observable<any>) => Observable<any>): Observable<T> {
28-
return this.lift(new RepeatWhenOperator(notifier));
29-
}
30-
31-
class RepeatWhenOperator<T> implements Operator<T, T> {
32-
constructor(protected notifier: (notifications: Observable<any>) => Observable<any>) {
33-
}
34-
35-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
36-
return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
37-
}
38-
}
39-
40-
/**
41-
* We need this JSDoc comment for affecting ESDoc.
42-
* @ignore
43-
* @extends {Ignored}
44-
*/
45-
class RepeatWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
46-
47-
private notifications: Subject<any>;
48-
private retries: Observable<any>;
49-
private retriesSubscription: Subscription;
50-
private sourceIsBeingSubscribedTo: boolean = true;
51-
52-
constructor(destination: Subscriber<R>,
53-
private notifier: (notifications: Observable<any>) => Observable<any>,
54-
private source: Observable<T>) {
55-
super(destination);
56-
}
57-
58-
notifyNext(outerValue: T, innerValue: R,
59-
outerIndex: number, innerIndex: number,
60-
innerSub: InnerSubscriber<T, R>): void {
61-
this.sourceIsBeingSubscribedTo = true;
62-
this.source.subscribe(this);
63-
}
64-
65-
notifyComplete(innerSub: InnerSubscriber<T, R>): void {
66-
if (this.sourceIsBeingSubscribedTo === false) {
67-
return super.complete();
68-
}
69-
}
70-
71-
complete() {
72-
this.sourceIsBeingSubscribedTo = false;
73-
74-
if (!this.isStopped) {
75-
if (!this.retries) {
76-
this.subscribeToRetries();
77-
} else if (this.retriesSubscription.closed) {
78-
return super.complete();
79-
}
80-
81-
this._unsubscribeAndRecycle();
82-
this.notifications.next();
83-
}
84-
}
85-
86-
protected _unsubscribe() {
87-
const { notifications, retriesSubscription } = this;
88-
if (notifications) {
89-
notifications.unsubscribe();
90-
this.notifications = null;
91-
}
92-
if (retriesSubscription) {
93-
retriesSubscription.unsubscribe();
94-
this.retriesSubscription = null;
95-
}
96-
this.retries = null;
97-
}
98-
99-
protected _unsubscribeAndRecycle(): Subscriber<T> {
100-
const { notifications, retries, retriesSubscription } = this;
101-
this.notifications = null;
102-
this.retries = null;
103-
this.retriesSubscription = null;
104-
super._unsubscribeAndRecycle();
105-
this.notifications = notifications;
106-
this.retries = retries;
107-
this.retriesSubscription = retriesSubscription;
108-
return this;
109-
}
110-
111-
private subscribeToRetries() {
112-
this.notifications = new Subject();
113-
const retries = tryCatch(this.notifier)(this.notifications);
114-
if (retries === errorObject) {
115-
return super.complete();
116-
}
117-
this.retries = retries;
118-
this.retriesSubscription = subscribeToResult(this, retries);
119-
}
19+
return higherOrder(notifier)(this);
12020
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export { publishBehavior } from './publishBehavior';
5454
export { race } from './race';
5555
export { reduce } from './reduce';
5656
export { repeat } from './repeat';
57+
export { repeatWhen } from './repeatWhen';
5758
export { refCount } from './refCount';
5859
export { scan } from './scan';
5960
export { subscribeOn } from './subscribeOn';

src/operators/repeatWhen.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable } from '../Observable';
4+
import { Subject } from '../Subject';
5+
import { Subscription, TeardownLogic } from '../Subscription';
6+
import { tryCatch } from '../util/tryCatch';
7+
import { errorObject } from '../util/errorObject';
8+
9+
import { OuterSubscriber } from '../OuterSubscriber';
10+
import { InnerSubscriber } from '../InnerSubscriber';
11+
import { subscribeToResult } from '../util/subscribeToResult';
12+
13+
import { MonoTypeOperatorFunction } from '../interfaces';
14+
15+
/**
16+
* Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source
17+
* Observable calls `complete`, this method will emit to the Observable returned from `notifier`. If that Observable
18+
* calls `complete` or `error`, then this method will call `complete` or `error` on the child subscription. Otherwise
19+
* this method will resubscribe to the source Observable.
20+
*
21+
* <img src="./img/repeatWhen.png" width="100%">
22+
*
23+
* @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with
24+
* which a user can `complete` or `error`, aborting the repetition.
25+
* @return {Observable} The source Observable modified with repeat logic.
26+
* @method repeatWhen
27+
* @owner Observable
28+
*/
29+
export function repeatWhen<T>(notifier: (notifications: Observable<any>) => Observable<any>): MonoTypeOperatorFunction<T> {
30+
return (source: Observable<T>) => source.lift(new RepeatWhenOperator(notifier));
31+
}
32+
33+
class RepeatWhenOperator<T> implements Operator<T, T> {
34+
constructor(protected notifier: (notifications: Observable<any>) => Observable<any>) {
35+
}
36+
37+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
38+
return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
39+
}
40+
}
41+
42+
/**
43+
* We need this JSDoc comment for affecting ESDoc.
44+
* @ignore
45+
* @extends {Ignored}
46+
*/
47+
class RepeatWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
48+
49+
private notifications: Subject<any>;
50+
private retries: Observable<any>;
51+
private retriesSubscription: Subscription;
52+
private sourceIsBeingSubscribedTo: boolean = true;
53+
54+
constructor(destination: Subscriber<R>,
55+
private notifier: (notifications: Observable<any>) => Observable<any>,
56+
private source: Observable<T>) {
57+
super(destination);
58+
}
59+
60+
notifyNext(outerValue: T, innerValue: R,
61+
outerIndex: number, innerIndex: number,
62+
innerSub: InnerSubscriber<T, R>): void {
63+
this.sourceIsBeingSubscribedTo = true;
64+
this.source.subscribe(this);
65+
}
66+
67+
notifyComplete(innerSub: InnerSubscriber<T, R>): void {
68+
if (this.sourceIsBeingSubscribedTo === false) {
69+
return super.complete();
70+
}
71+
}
72+
73+
complete() {
74+
this.sourceIsBeingSubscribedTo = false;
75+
76+
if (!this.isStopped) {
77+
if (!this.retries) {
78+
this.subscribeToRetries();
79+
} else if (this.retriesSubscription.closed) {
80+
return super.complete();
81+
}
82+
83+
this._unsubscribeAndRecycle();
84+
this.notifications.next();
85+
}
86+
}
87+
88+
protected _unsubscribe() {
89+
const { notifications, retriesSubscription } = this;
90+
if (notifications) {
91+
notifications.unsubscribe();
92+
this.notifications = null;
93+
}
94+
if (retriesSubscription) {
95+
retriesSubscription.unsubscribe();
96+
this.retriesSubscription = null;
97+
}
98+
this.retries = null;
99+
}
100+
101+
protected _unsubscribeAndRecycle(): Subscriber<T> {
102+
const { notifications, retries, retriesSubscription } = this;
103+
this.notifications = null;
104+
this.retries = null;
105+
this.retriesSubscription = null;
106+
super._unsubscribeAndRecycle();
107+
this.notifications = notifications;
108+
this.retries = retries;
109+
this.retriesSubscription = retriesSubscription;
110+
return this;
111+
}
112+
113+
private subscribeToRetries() {
114+
this.notifications = new Subject();
115+
const retries = tryCatch(this.notifier)(this.notifications);
116+
if (retries === errorObject) {
117+
return super.complete();
118+
}
119+
this.retries = retries;
120+
this.retriesSubscription = subscribeToResult(this, retries);
121+
}
122+
}

0 commit comments

Comments
 (0)