Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
const ReplaySubject = Rx.ReplaySubject;

/** @test {groupBy} */
describe('Observable.prototype.groupBy', () => {
Expand Down Expand Up @@ -98,6 +99,27 @@ describe('Observable.prototype.groupBy', () => {
expect(resultingGroups).to.deep.equal(expectedGroups);
});

it('should group values with a subject selector', (done: MochaDone) => {
const expectedGroups = [
{ key: 1, values: [3] },
{ key: 0, values: [2] }
];

Observable.of(1, 2, 3)
.groupBy((x: number) => x % 2, null, null, () => new ReplaySubject(1))
// Ensure each inner group reaches the destination after the first event
// has been next'd to the group
.delay(5)
.subscribe((g: any) => {
const expectedGroup = expectedGroups.shift();
expect(g.key).to.equal(expectedGroup.key);

g.subscribe((x: any) => {
expect(x).to.deep.equal(expectedGroup.values.shift());
});
}, null, done);
});

it('should handle an empty Observable', () => {
const e1 = cold('|');
const e1subs = '(^!)';
Expand Down
20 changes: 12 additions & 8 deletions src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import { FastMap } from '../util/FastMap';
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K>(this: Observable<T>, keySelector: (value: T) => K, elementSelector: void, durationSelector: (grouped: GroupedObservable<K, T>) => Observable<any>): Observable<GroupedObservable<K, T>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>>;
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K, elementSelector?: (value: T) => R, durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>, subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>>;
/* tslint:disable:max-line-length */
export function groupBy<T, K, R>(this: Observable<T>, keySelector: (value: T) => K,
elementSelector?: ((value: T) => R) | void,
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(this, keySelector, elementSelector, durationSelector));
durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
subjectSelector?: () => Subject<R>): Observable<GroupedObservable<K, R>> {
return this.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
}

export interface RefCountSubscription {
Expand All @@ -46,15 +48,15 @@ export interface RefCountSubscription {
}

class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
constructor(public source: Observable<T>,
private keySelector: (value: T) => K,
constructor(private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this to subjectFactory since it's not taking any input into this function

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

}

call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
return source._subscribe(new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector
subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
));
}
}
Expand All @@ -72,7 +74,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
constructor(destination: Subscriber<GroupedObservable<K, R>>,
private keySelector: (value: T) => K,
private elementSelector?: ((value: T) => R) | void,
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>) {
private durationSelector?: (grouped: GroupedObservable<K, R>) => Observable<any>,
private subjectSelector?: () => Subject<R>) {
super(destination);
}

Expand Down Expand Up @@ -109,7 +112,8 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
}

if (!group) {
groups.set(key, group = new Subject<R>());
group = this.subjectSelector ? this.subjectSelector() : new Subject<R>();
groups.set(key, group);
const groupedObservable = new GroupedObservable(key, group, this);
this.destination.next(groupedObservable);
if (this.durationSelector) {
Expand Down