Skip to content

Commit c02bc8f

Browse files
committed
fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions.
The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list. Fixes #2355
1 parent 31dfc73 commit c02bc8f

File tree

10 files changed

+150
-108
lines changed

10 files changed

+150
-108
lines changed

spec/operators/observeOn-spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => {
104104
.observeOn(Rx.Scheduler.asap)
105105
.subscribe(
106106
x => {
107-
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
107+
const observeOnSubscriber = subscription._subscriptions[0];
108108
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification
109-
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
109+
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
110110
.to.equal('N');
111-
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value)
111+
expect(observeOnSubscriber._subscriptions[1].state.notification.value)
112112
.to.equal(x);
113113
results.push(x);
114114
},
115115
err => done(err),
116116
() => {
117117
// now that the last nexted value is done, there should only be a complete notification scheduled
118-
const observeOnSubscriber = subscription._subscriptions[0]._innerSub;
118+
const observeOnSubscriber = subscription._subscriptions[0];
119119
expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification
120120
// only this completion notification should remain.
121-
expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind)
121+
expect(observeOnSubscriber._subscriptions[1].state.notification.kind)
122122
.to.equal('C');
123123
// After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this.
124124
expect(results).to.deep.equal([1, 2, 3]);

spec/operators/switch-spec.ts

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => {
223223

224224
expect(completed).to.be.true;
225225
});
226-
});
226+
227+
it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => {
228+
let iStream: Rx.Subject<number>;
229+
const oStreamControl = new Rx.Subject<number>();
230+
const oStream = oStreamControl.map(() => {
231+
return (iStream = new Rx.Subject());
232+
});
233+
const switcher = oStream.switch();
234+
const result = [];
235+
let sub = switcher.subscribe((x: number) => result.push(x));
236+
237+
[0, 1, 2, 3, 4].forEach((n) => {
238+
oStreamControl.next(n); // creates inner
239+
iStream.complete();
240+
});
241+
// Expect one child of switch(): The oStream
242+
expect(
243+
(<any>sub)._subscriptions[0]._subscriptions.length
244+
).to.equal(1);
245+
sub.unsubscribe();
246+
});
247+
248+
it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => {
249+
const oStreamControl = new Rx.Subject<number>();
250+
const oStream = oStreamControl.map(() => {
251+
return (new Rx.Subject());
252+
});
253+
const switcher = oStream.switch();
254+
const result = [];
255+
let sub = switcher.subscribe((x: number) => result.push(x));
256+
257+
[0, 1, 2, 3, 4].forEach((n) => {
258+
oStreamControl.next(n); // creates inner
259+
});
260+
// Expect two children of switch(): The oStream and the first inner
261+
expect(
262+
(<any>sub)._subscriptions[0]._subscriptions.length
263+
).to.equal(2);
264+
sub.unsubscribe();
265+
});
266+
});

src/Subscriber.ts

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,18 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
144144
this.destination.complete();
145145
this.unsubscribe();
146146
}
147+
148+
protected _unsubscribeAndRecycle(): Subscriber<T> {
149+
const { _parent, _parents } = this;
150+
this._parent = null;
151+
this._parents = null;
152+
this.unsubscribe();
153+
this.closed = false;
154+
this.isStopped = false;
155+
this._parent = _parent;
156+
this._parents = _parents;
157+
return this;
158+
}
147159
}
148160

149161
/**
@@ -155,7 +167,7 @@ class SafeSubscriber<T> extends Subscriber<T> {
155167

156168
private _context: any;
157169

158-
constructor(private _parent: Subscriber<T>,
170+
constructor(private _parentSubscriber: Subscriber<T>,
159171
observerOrNext?: PartialObserver<T> | ((value: T) => void),
160172
error?: (e?: any) => void,
161173
complete?: () => void) {
@@ -185,46 +197,46 @@ class SafeSubscriber<T> extends Subscriber<T> {
185197

186198
next(value?: T): void {
187199
if (!this.isStopped && this._next) {
188-
const { _parent } = this;
189-
if (!_parent.syncErrorThrowable) {
200+
const { _parentSubscriber } = this;
201+
if (!_parentSubscriber.syncErrorThrowable) {
190202
this.__tryOrUnsub(this._next, value);
191-
} else if (this.__tryOrSetError(_parent, this._next, value)) {
203+
} else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) {
192204
this.unsubscribe();
193205
}
194206
}
195207
}
196208

197209
error(err?: any): void {
198210
if (!this.isStopped) {
199-
const { _parent } = this;
211+
const { _parentSubscriber } = this;
200212
if (this._error) {
201-
if (!_parent.syncErrorThrowable) {
213+
if (!_parentSubscriber.syncErrorThrowable) {
202214
this.__tryOrUnsub(this._error, err);
203215
this.unsubscribe();
204216
} else {
205-
this.__tryOrSetError(_parent, this._error, err);
217+
this.__tryOrSetError(_parentSubscriber, this._error, err);
206218
this.unsubscribe();
207219
}
208-
} else if (!_parent.syncErrorThrowable) {
220+
} else if (!_parentSubscriber.syncErrorThrowable) {
209221
this.unsubscribe();
210222
throw err;
211223
} else {
212-
_parent.syncErrorValue = err;
213-
_parent.syncErrorThrown = true;
224+
_parentSubscriber.syncErrorValue = err;
225+
_parentSubscriber.syncErrorThrown = true;
214226
this.unsubscribe();
215227
}
216228
}
217229
}
218230

219231
complete(): void {
220232
if (!this.isStopped) {
221-
const { _parent } = this;
233+
const { _parentSubscriber } = this;
222234
if (this._complete) {
223-
if (!_parent.syncErrorThrowable) {
235+
if (!_parentSubscriber.syncErrorThrowable) {
224236
this.__tryOrUnsub(this._complete);
225237
this.unsubscribe();
226238
} else {
227-
this.__tryOrSetError(_parent, this._complete);
239+
this.__tryOrSetError(_parentSubscriber, this._complete);
228240
this.unsubscribe();
229241
}
230242
} else {
@@ -254,9 +266,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
254266
}
255267

256268
protected _unsubscribe(): void {
257-
const { _parent } = this;
269+
const { _parentSubscriber } = this;
258270
this._context = null;
259-
this._parent = null;
260-
_parent.unsubscribe();
271+
this._parentSubscriber = null;
272+
_parentSubscriber.unsubscribe();
261273
}
262274
}

src/Subscription.ts

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { isArray } from './util/isArray';
21
import { isObject } from './util/isObject';
32
import { isFunction } from './util/isFunction';
43
import { tryCatch } from './util/tryCatch';
@@ -40,7 +39,9 @@ export class Subscription implements ISubscription {
4039
*/
4140
public closed: boolean = false;
4241

43-
private _subscriptions: ISubscription[];
42+
protected _parent: Subscription = null;
43+
protected _parents: Subscription[] = null;
44+
private _subscriptions: ISubscription[] = null;
4445

4546
/**
4647
* @param {function(): void} [unsubscribe] A function describing how to
@@ -66,11 +67,25 @@ export class Subscription implements ISubscription {
6667
return;
6768
}
6869

69-
this.closed = true;
70-
71-
const { _unsubscribe, _subscriptions } = (<any> this);
70+
let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);
7271

73-
(<any> this)._subscriptions = null;
72+
this.closed = true;
73+
this._parent = null;
74+
this._parents = null;
75+
// null out _subscriptions first so any child subscriptions that attempt
76+
// to remove themselves from this subscription will noop
77+
this._subscriptions = null;
78+
79+
let index = -1, len = _parents && _parents.length || 0;
80+
81+
// if this._parent is null, then so is this._parents, and we
82+
// don't have to remove ourselves from any parent subscriptions.
83+
while (_parent) {
84+
_parent.remove(this);
85+
// if this._parents is null or index >= len,
86+
// then _parent is set to false, and the loop exits
87+
_parent = ++index < len && _parents[index];
88+
}
7489

7590
if (isFunction(_unsubscribe)) {
7691
let trial = tryCatch(_unsubscribe).call(this);
@@ -83,10 +98,10 @@ export class Subscription implements ISubscription {
8398
}
8499
}
85100

86-
if (isArray(_subscriptions)) {
101+
if (_subscriptions) {
87102

88-
let index = -1;
89-
const len = _subscriptions.length;
103+
index = -1;
104+
len = _subscriptions.length;
90105

91106
while (++index < len) {
92107
const sub = _subscriptions[index];
@@ -138,27 +153,33 @@ export class Subscription implements ISubscription {
138153
return this;
139154
}
140155

141-
let sub = (<Subscription> teardown);
156+
let subscription = (<Subscription> teardown);
142157

143158
switch (typeof teardown) {
144159
case 'function':
145-
sub = new Subscription(<(() => void) > teardown);
160+
subscription = new Subscription(<(() => void) > teardown);
146161
case 'object':
147-
if (sub.closed || typeof sub.unsubscribe !== 'function') {
148-
return sub;
162+
if (subscription.closed || typeof subscription.unsubscribe !== 'function') {
163+
return subscription;
149164
} else if (this.closed) {
150-
sub.unsubscribe();
151-
return sub;
165+
subscription.unsubscribe();
166+
return subscription;
167+
} else if (typeof subscription._addParent !== 'function' /* quack quack */) {
168+
const tmp = subscription;
169+
subscription = new Subscription();
170+
subscription._subscriptions = [tmp];
152171
}
153172
break;
154173
default:
155174
throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.');
156175
}
157176

158-
const childSub = new ChildSubscription(sub, this);
159-
this._subscriptions = this._subscriptions || [];
160-
this._subscriptions.push(childSub);
161-
return childSub;
177+
const subscriptions = this._subscriptions || (this._subscriptions = []);
178+
179+
subscriptions.push(subscription);
180+
subscription._addParent(this);
181+
182+
return subscription;
162183
}
163184

164185
/**
@@ -168,37 +189,25 @@ export class Subscription implements ISubscription {
168189
* @return {void}
169190
*/
170191
remove(subscription: Subscription): void {
171-
172-
// HACK: This might be redundant because of the logic in `add()`
173-
if (subscription == null || (
174-
subscription === this) || (
175-
subscription === Subscription.EMPTY)) {
176-
return;
177-
}
178-
179-
const subscriptions = (<any> this)._subscriptions;
180-
192+
const subscriptions = this._subscriptions;
181193
if (subscriptions) {
182194
const subscriptionIndex = subscriptions.indexOf(subscription);
183195
if (subscriptionIndex !== -1) {
184196
subscriptions.splice(subscriptionIndex, 1);
185197
}
186198
}
187199
}
188-
}
189-
190-
export class ChildSubscription extends Subscription {
191-
constructor(private _innerSub: ISubscription, private _parent: Subscription) {
192-
super();
193-
}
194200

195-
_unsubscribe() {
196-
const { _innerSub, _parent } = this;
197-
_parent.remove(this);
198-
_innerSub.unsubscribe();
201+
private _addParent(parent: Subscription) {
202+
let { _parent, _parents } = this;
203+
if (!_parent) {
204+
this._parent = parent;
205+
} else if (_parents || (_parents = this._parents = [])) {
206+
_parents.push(this);
207+
}
199208
}
200209
}
201210

202211
function flattenUnsubscriptionErrors(errors: any[]) {
203212
return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []);
204-
}
213+
}

src/operator/catch.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,7 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, R> {
107107
super.error(err2);
108108
return;
109109
}
110-
this.unsubscribe();
111-
this.closed = false;
112-
this.isStopped = false;
110+
this._unsubscribeAndRecycle();
113111
this.add(subscribeToResult(this, result));
114112
}
115113
}

src/operator/observeOn.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Operator } from '../Operator';
44
import { PartialObserver } from '../Observer';
55
import { Subscriber } from '../Subscriber';
66
import { Notification } from '../Notification';
7-
import { TeardownLogic, Subscription } from '../Subscription';
7+
import { TeardownLogic } from '../Subscription';
88
import { Action } from '../scheduler/Action';
99

1010
/**
@@ -36,11 +36,9 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
3636
*/
3737
export class ObserveOnSubscriber<T> extends Subscriber<T> {
3838
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) {
39-
const { notification, destination, subscription } = arg;
39+
const { notification, destination } = arg;
4040
notification.observe(destination);
41-
if (subscription) {
42-
subscription.unsubscribe();
43-
}
41+
this.unsubscribe();
4442
}
4543

4644
constructor(destination: Subscriber<T>,
@@ -50,10 +48,11 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
5048
}
5149

5250
private scheduleMessage(notification: Notification<any>): void {
53-
const message = new ObserveOnMessage(notification, this.destination);
54-
message.subscription = this.add(
55-
this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message)
56-
);
51+
this.add(this.scheduler.schedule(
52+
ObserveOnSubscriber.dispatch,
53+
this.delay,
54+
new ObserveOnMessage(notification, this.destination)
55+
));
5756
}
5857

5958
protected _next(value: T): void {
@@ -70,8 +69,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
7069
}
7170

7271
export class ObserveOnMessage {
73-
public subscription: Subscription;
74-
7572
constructor(public notification: Notification<any>,
7673
public destination: PartialObserver<any>) {
7774
}

src/operator/repeat.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,7 @@ class RepeatSubscriber<T> extends Subscriber<T> {
5656
} else if (count > -1) {
5757
this.count = count - 1;
5858
}
59-
this.unsubscribe();
60-
this.isStopped = false;
61-
this.closed = false;
62-
source.subscribe(this);
59+
source.subscribe(this._unsubscribeAndRecycle());
6360
}
6461
}
6562
}

0 commit comments

Comments
 (0)