Skip to content

Commit 44fbc14

Browse files
trxcllntjayphelps
authored andcommitted
fix(multicast): fix a bug that caused multicast to omit messages after termination (#2021)
ConnectableObservable as the multicast selector function source argument would omit events after source termination when used with historic Subjects.
1 parent f93cb1d commit 44fbc14

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

spec/operators/multicast-spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions, time
44

55
const Observable = Rx.Observable;
66
const Subject = Rx.Subject;
7+
const ReplaySubject = Rx.ReplaySubject;
78

89
/** @test {multicast} */
910
describe('Observable.prototype.multicast', () => {
@@ -89,6 +90,26 @@ describe('Observable.prototype.multicast', () => {
8990
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
9091
});
9192

93+
it('should accept a multicast selector and respect the subject\'s messaging semantics', () => {
94+
const source = cold('-1-2-3----4-|');
95+
const sourceSubs = ['^ !',
96+
' ^ !',
97+
' ^ !'];
98+
const multicasted = source.multicast(() => new ReplaySubject(1),
99+
x => x.concat(x.takeLast(1)));
100+
const expected1 = '-1-2-3----4-(4|)';
101+
const expected2 = ' -1-2-3----4-(4|)';
102+
const expected3 = ' -1-2-3----4-(4|)';
103+
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
104+
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
105+
const subscriber3 = hot(' c| ').mergeMapTo(multicasted);
106+
107+
expectObservable(subscriber1).toBe(expected1);
108+
expectObservable(subscriber2).toBe(expected2);
109+
expectObservable(subscriber3).toBe(expected3);
110+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
111+
});
112+
92113
it('should do nothing if connect is not called, despite subscriptions', () => {
93114
const source = cold('--1-2---3-4--5-|');
94115
const sourceSubs = [];

src/operator/multicast.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ export class MulticastOperator<T> implements Operator<T, T> {
5656
constructor(private subjectFactory: () => Subject<T>,
5757
private selector: (source: Observable<T>) => Observable<T>) {
5858
}
59-
call(subscriber: Subscriber<T>, self: any): any {
59+
call(subscriber: Subscriber<T>, source: any): any {
6060
const { selector } = this;
61-
const connectable = new ConnectableObservable(self.source, this.subjectFactory);
62-
const subscription = selector(connectable).subscribe(subscriber);
63-
subscription.add(connectable.connect());
61+
const subject = this.subjectFactory();
62+
const subscription = selector(subject).subscribe(subscriber);
63+
subscription.add(source._subscribe(subject));
6464
return subscription;
6565
}
6666
}

0 commit comments

Comments
 (0)