Skip to content

Commit 686082d

Browse files
committed
fix(observeOn): remove observed subscription to clean up notification instance
- relates to #2244
1 parent 6922b16 commit 686082d

File tree

1 file changed

+23
-14
lines changed

1 file changed

+23
-14
lines changed

src/operator/observeOn.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { Observable } from '../Observable';
22
import { Scheduler } from '../Scheduler';
33
import { Operator } from '../Operator';
4-
import { PartialObserver } from '../Observer';
54
import { Subscriber } from '../Subscriber';
65
import { Notification } from '../Notification';
7-
import { TeardownLogic } from '../Subscription';
6+
import { Subscription, TeardownLogic } from '../Subscription';
87

98
/**
109
* @see {@link Notification}
@@ -34,21 +33,25 @@ export class ObserveOnOperator<T> implements Operator<T, T> {
3433
* @extends {Ignored}
3534
*/
3635
export class ObserveOnSubscriber<T> extends Subscriber<T> {
37-
static dispatch(arg: ObserveOnMessage) {
38-
const { notification, destination } = arg;
39-
notification.observe(destination);
40-
}
41-
42-
constructor(destination: Subscriber<T>,
36+
constructor(protected destination: Subscriber<T>,
4337
private scheduler: Scheduler,
4438
private delay: number = 0) {
4539
super(destination);
4640
}
4741

42+
private static dispatch(context: ObserveOnContext): void {
43+
const { notification, subscriber, subscription } = context;
44+
subscriber.observe(notification, subscription);
45+
}
46+
4847
private scheduleMessage(notification: Notification<any>): void {
49-
this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch,
50-
this.delay,
51-
new ObserveOnMessage(notification, this.destination)));
48+
const message = new ObserveOnContext(notification, this);
49+
const subscription = this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message);
50+
51+
//do not add into subscription if scheduled synchronously
52+
if (!subscription.closed) {
53+
this.add(message.subscription = subscription);
54+
}
5255
}
5356

5457
protected _next(value: T): void {
@@ -62,10 +65,16 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {
6265
protected _complete(): void {
6366
this.scheduleMessage(Notification.createComplete());
6467
}
68+
69+
public observe(notification: Notification<any>, subscription: Subscription): void {
70+
notification.observe(this.destination);
71+
this.remove(subscription);
72+
}
6573
}
6674

67-
export class ObserveOnMessage {
68-
constructor(public notification: Notification<any>,
69-
public destination: PartialObserver<any>) {
75+
class ObserveOnContext {
76+
public subscription: Subscription;
77+
constructor(public readonly notification: Notification<any>,
78+
public readonly subscriber: ObserveOnSubscriber<any>) {
7079
}
7180
}

0 commit comments

Comments
 (0)