Skip to content

Commit e5a6070

Browse files
authored
Merge pull request #3512 from benlesh/skipUntil-fix-notifier-subscription
fix skipUntil
2 parents df6ad6d + 889f84a commit e5a6070

File tree

3 files changed

+76
-87
lines changed

3 files changed

+76
-87
lines changed

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

spec/operators/skipUntil-spec.ts

Lines changed: 65 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,55 @@
1-
import * as Rx from 'rxjs/Rx';
21
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
2+
import { Observable, of, Subject } from 'rxjs';
3+
import { skipUntil, mergeMap } from 'rxjs/operators';
34

45
declare function asDiagram(arg: string): Function;
56

6-
const Observable = Rx.Observable;
7-
87
/** @test {skipUntil} */
9-
describe('Observable.prototype.skipUntil', () => {
8+
describe('skipUntil', () => {
109
asDiagram('skipUntil')('should skip values until another observable notifies', () => {
1110
const e1 = hot('--a--b--c--d--e----|');
1211
const e1subs = '^ !';
1312
const skip = hot('---------x------| ');
14-
const skipSubs = '^ ! ';
13+
const skipSubs = '^ ! ';
1514
const expected = ('-----------d--e----|');
1615

17-
expectObservable(e1.skipUntil(skip)).toBe(expected);
16+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
1817
expectSubscriptions(e1.subscriptions).toBe(e1subs);
1918
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
2019
});
2120

22-
it('should emit element only after another observable emits', () => {
21+
it('should emit elements after notifer emits', () => {
2322
const e1 = hot('--a--b--c--d--e--|');
2423
const e1subs = '^ !';
25-
const skip = hot('-----------x----| ');
26-
const skipSubs = '^ ! ';
27-
const expected = ('--------------e--|');
24+
const skip = hot('---------x----| ');
25+
const skipSubs = '^ ! ';
26+
const expected = ('-----------d--e--|');
2827

29-
expectObservable(e1.skipUntil(skip)).toBe(expected);
28+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
3029
expectSubscriptions(e1.subscriptions).toBe(e1subs);
3130
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
3231
});
3332

34-
it('should skip value and raises error until another observable raises error', () => {
33+
it('should raise an error if notifier throws and source is hot', () => {
3534
const e1 = hot('--a--b--c--d--e--|');
3635
const e1subs = '^ ! ';
3736
const skip = hot('-------------# ');
3837
const skipSubs = '^ ! ';
3938
const expected = '-------------# ';
4039

41-
expectObservable(e1.skipUntil(skip)).toBe(expected);
40+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
4241
expectSubscriptions(e1.subscriptions).toBe(e1subs);
4342
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
4443
});
4544

46-
it('should skip all element when another observable does not emit and completes early', () => {
45+
it('should skip all elements when notifier does not emit and completes early', () => {
4746
const e1 = hot('--a--b--c--d--e--|');
4847
const e1subs = '^ !';
49-
const skip = hot('------------| ');
50-
const skipSubs = '^ ! ';
48+
const skip = hot('------------|');
49+
const skipSubs = '^ !';
5150
const expected = '-----------------|';
5251

53-
expectObservable(e1.skipUntil(skip)).toBe(expected);
52+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
5453
expectSubscriptions(e1.subscriptions).toBe(e1subs);
5554
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
5655
});
@@ -63,7 +62,7 @@ describe('Observable.prototype.skipUntil', () => {
6362
const skipSubs = '^ ! ';
6463
const expected = ('---------- ');
6564

66-
expectObservable(e1.skipUntil(skip), unsub).toBe(expected);
65+
expectObservable(e1.pipe(skipUntil(skip)), unsub).toBe(expected);
6766
expectSubscriptions(e1.subscriptions).toBe(e1subs);
6867
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
6968
});
@@ -76,154 +75,143 @@ describe('Observable.prototype.skipUntil', () => {
7675
const expected = ('---------- ');
7776
const unsub = ' ! ';
7877

79-
const result = e1
80-
.mergeMap((x: string) => Observable.of(x))
81-
.skipUntil(skip)
82-
.mergeMap((x: string) => Observable.of(x));
78+
const result = e1.pipe(
79+
mergeMap(x => of(x)),
80+
skipUntil(skip),
81+
mergeMap(x => of(x)),
82+
);
8383

8484
expectObservable(result, unsub).toBe(expected);
8585
expectSubscriptions(e1.subscriptions).toBe(e1subs);
8686
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
8787
});
8888

89-
it('should skip all element when another observable is empty', () => {
89+
it('should skip all elements when notifier is empty', () => {
9090
const e1 = hot('--a--b--c--d--e--|');
9191
const e1subs = '^ !';
9292
const skip = cold('|');
9393
const skipSubs = '(^!)';
9494
const expected = '-----------------|';
9595

96-
expectObservable(e1.skipUntil(skip)).toBe(expected);
96+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
9797
expectSubscriptions(e1.subscriptions).toBe(e1subs);
9898
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
9999
});
100100

101-
it('should keep subscription to source, to wait for its eventual complete', () => {
101+
it('should keep subscription to source, to wait for its eventual completion', () => {
102102
const e1 = hot('------------------------------|');
103103
const e1subs = '^ !';
104104
const skip = hot('-------| ');
105105
const skipSubs = '^ ! ';
106106
const expected = '------------------------------|';
107107

108-
expectObservable(e1.skipUntil(skip)).toBe(expected);
108+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
109109
expectSubscriptions(e1.subscriptions).toBe(e1subs);
110110
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
111111
});
112112

113-
it('should not complete if source observable does not complete', () => {
113+
it('should not complete if hot source observable does not complete', () => {
114114
const e1 = hot('-');
115115
const e1subs = '^';
116116
const skip = hot('-------------x--|');
117-
const skipSubs = '^ !';
117+
const skipSubs = '^ ! ';
118118
const expected = '-';
119119

120-
expectObservable(e1.skipUntil(skip)).toBe(expected);
120+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
121121
expectSubscriptions(e1.subscriptions).toBe(e1subs);
122122
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
123123
});
124124

125-
it('should not complete if source observable never completes', () => {
125+
it('should not complete if cold source observable never completes', () => {
126126
const e1 = cold( '-');
127127
const e1subs = '^';
128128
const skip = hot('-------------x--|');
129-
const skipSubs = '^ !';
129+
const skipSubs = '^ ! ';
130130
const expected = '-';
131131

132-
expectObservable(e1.skipUntil(skip)).toBe(expected);
132+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
133133
expectSubscriptions(e1.subscriptions).toBe(e1subs);
134134
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
135135
});
136136

137-
it('should raise error if source does not completes when another observable raises error', () => {
138-
const e1 = hot('-');
139-
const e1subs = '^ !';
140-
const skip = hot('-------------#');
141-
const skipSubs = '^ !';
142-
const expected = '-------------#';
143-
144-
expectObservable(e1.skipUntil(skip)).toBe(expected);
145-
expectSubscriptions(e1.subscriptions).toBe(e1subs);
146-
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
147-
});
148-
149-
it('should raise error if source never completes when another observable raises error', () => {
137+
it('should raise error if cold source is never and notifier errors', () => {
150138
const e1 = cold( '-');
151139
const e1subs = '^ !';
152140
const skip = hot('-------------#');
153141
const skipSubs = '^ !';
154142
const expected = '-------------#';
155143

156-
expectObservable(e1.skipUntil(skip)).toBe(expected);
144+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
157145
expectSubscriptions(e1.subscriptions).toBe(e1subs);
158146
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
159147
});
160148

161-
it('should skip all element and does not complete when another observable never completes', () => {
149+
it('should skip all elements and complete if notifier is cold never', () => {
162150
const e1 = hot( '--a--b--c--d--e--|');
163151
const e1subs = '^ !';
164152
const skip = cold('-');
165153
const skipSubs = '^ !';
166-
const expected = '-';
154+
const expected = '-----------------|';
167155

168-
expectObservable(e1.skipUntil(skip)).toBe(expected);
156+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
169157
expectSubscriptions(e1.subscriptions).toBe(e1subs);
170158
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
171159
});
172160

173-
it('should skip all element and does not complete when another observable does not completes', () => {
161+
it('should skip all elements and complete if notifier is a hot never', () => {
174162
const e1 = hot('--a--b--c--d--e--|');
175163
const e1subs = '^ !';
176164
const skip = hot('-');
177165
const skipSubs = '^ !';
178-
const expected = '-';
166+
const expected = '-----------------|';
179167

180-
expectObservable(e1.skipUntil(skip)).toBe(expected);
168+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
181169
expectSubscriptions(e1.subscriptions).toBe(e1subs);
182170
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
183171
});
184172

185-
it('should skip all element and does not complete when another observable completes after source', () => {
186-
const e1 = hot('--a--b--c--d--e--|');
173+
it('should skip all elements and complete, even if notifier would not complete until later', () => {
174+
const e1 = hot('^-a--b--c--d--e--|');
187175
const e1subs = '^ !';
188-
const skip = hot('------------------------|');
176+
const skip = hot('^-----------------------|');
189177
const skipSubs = '^ !';
190-
const expected = '------------------';
178+
const expected = '-----------------|';
191179

192-
expectObservable(e1.skipUntil(skip)).toBe(expected);
180+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
193181
expectSubscriptions(e1.subscriptions).toBe(e1subs);
194182
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
195183
});
196184

197-
it('should not completes if source does not completes when another observable does not emit', () => {
185+
it('should not complete if source does not complete if notifier completes without emission', () => {
198186
const e1 = hot('-');
199187
const e1subs = '^';
200188
const skip = hot('--------------|');
201189
const skipSubs = '^ !';
202190
const expected = '-';
203191

204-
expectObservable(e1.skipUntil(skip)).toBe(expected);
192+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
205193
expectSubscriptions(e1.subscriptions).toBe(e1subs);
206194
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
207195
});
208196

209-
it('should not completes if source and another observable both does not complete', () => {
197+
it('should not complete if source and notifier are both hot never', () => {
210198
const e1 = hot('-');
211199
const e1subs = '^';
212200
const skip = hot('-');
213201
const skipSubs = '^';
214202
const expected = '-';
215203

216-
expectObservable(e1.skipUntil(skip)).toBe(expected);
204+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
217205
expectSubscriptions(e1.subscriptions).toBe(e1subs);
218206
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
219207
});
220208

221-
it('should skip all element when another observable unsubscribed early before emit', () => {
209+
it('should skip skip all elements if notifier is unsubscribed explicitly before the notifier emits', () => {
222210
const e1 = hot( '--a--b--c--d--e--|');
223211
const e1subs = ['^ !',
224-
'^ !']; // for the explicit subscribe some lines below
225-
const skip = new Rx.Subject();
226-
const expected = '-';
212+
'^ !']; // for the explicit subscribe some lines below
213+
const skip = new Subject();
214+
const expected = '-----------------|';
227215

228216
e1.subscribe((x: string) => {
229217
if (x === 'd' && !skip.closed) {
@@ -233,7 +221,18 @@ describe('Observable.prototype.skipUntil', () => {
233221
skip.unsubscribe();
234222
});
235223

236-
expectObservable(e1.skipUntil(skip)).toBe(expected);
224+
expectObservable(e1.pipe(skipUntil(skip))).toBe(expected);
237225
expectSubscriptions(e1.subscriptions).toBe(e1subs);
238226
});
227+
228+
it('should unsubscribe the notifier after its first nexted value', () => {
229+
const source = hot('-^-o---o---o---o---o---o---|');
230+
const notifier = hot('-^--------n--n--n--n--n--n-|');
231+
const nSubs = '^ !';
232+
const expected = '-^---------o---o---o---o---|';
233+
const result = source.pipe(skipUntil(notifier));
234+
235+
expectObservable(result).toBe(expected);
236+
expectSubscriptions(notifier.subscriptions).toBe(nSubs);
237+
});
239238
});

src/internal/operators/skipUntil.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { Observable } from '../Observable';
44
import { OuterSubscriber } from '../OuterSubscriber';
55
import { InnerSubscriber } from '../InnerSubscriber';
66
import { subscribeToResult } from '../util/subscribeToResult';
7-
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
7+
import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types';
8+
import { Subscription } from '../Subscription';
89

910
/**
1011
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
@@ -26,8 +27,8 @@ class SkipUntilOperator<T> implements Operator<T, T> {
2627
constructor(private notifier: Observable<any>) {
2728
}
2829

29-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
30-
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier));
30+
call(destination: Subscriber<T>, source: any): TeardownLogic {
31+
return source.subscribe(new SkipUntilSubscriber(destination, this.notifier));
3132
}
3233
}
3334

@@ -39,12 +40,11 @@ class SkipUntilOperator<T> implements Operator<T, T> {
3940
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
4041

4142
private hasValue: boolean = false;
42-
private isInnerStopped: boolean = false;
43+
private innerSubscription: Subscription;
4344

44-
constructor(destination: Subscriber<any>,
45-
notifier: Observable<any>) {
45+
constructor(destination: Subscriber<R>, notifier: ObservableInput<any>) {
4646
super(destination);
47-
this.add(subscribeToResult(this, notifier));
47+
this.add(this.innerSubscription = subscribeToResult(this, notifier));
4848
}
4949

5050
protected _next(value: T) {
@@ -53,24 +53,14 @@ class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> {
5353
}
5454
}
5555

56-
protected _complete() {
57-
if (this.isInnerStopped) {
58-
super._complete();
59-
} else {
60-
this.unsubscribe();
61-
}
62-
}
63-
6456
notifyNext(outerValue: T, innerValue: R,
6557
outerIndex: number, innerIndex: number,
6658
innerSub: InnerSubscriber<T, R>): void {
6759
this.hasValue = true;
60+
this.innerSubscription.unsubscribe();
6861
}
6962

70-
notifyComplete(): void {
71-
this.isInnerStopped = true;
72-
if (this.isStopped) {
73-
super._complete();
74-
}
63+
notifyComplete() {
64+
/* do nothing */
7565
}
7666
}

0 commit comments

Comments
 (0)