Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/share.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var oldShareWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).share();
var newShareWithImmediateScheduler = RxNew.Observable.range(0, 25).share();

return suite
.add('old share with immediate scheduler', function () {
oldShareWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new share with immediate scheduler', function () {
newShareWithImmediateScheduler.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
2 changes: 1 addition & 1 deletion perf/micro/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Observable.create(function(observer) {
})
.filter(function(filePath) {
var argv = process.argv;
if(argv && argv.length > 2) {
if(argv && argv.length > 2) {
return argv.slice(2).some(function(val) {
return path.parse(filePath).name === val;
});
Expand Down
64 changes: 64 additions & 0 deletions spec/operators/share-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* globals describe, expect, it, hot, cold, expectObservable */

var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.share()', function (){
it('should share a single subscription', function (){
var subscriptionCount = 0;
var obs = new Observable(function(observer){
subscriptionCount++;
});

var source = obs.share();

expect(subscriptionCount).toBe(0);

source.subscribe();
source.subscribe();

expect(subscriptionCount).toBe(1);
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have a marble test where e1 is a cold Observable to make sense applying share on it? Also the case of multiple subscribers to a was-cold shared Observable makes a lot of sense to have a subscribe-and-unsubscribe marble diagram test that we mentioned in #428.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably

it('should not change the output of the observable when successful', function (){
var e1 = hot('---a--^--b--c--d--e--|');
var expected = '---b--c--d--e--|';

expectObservable(e1.share()).toBe(expected);
});

it('should not change the output of the observable when error', function (){
var e1 = hot('---a--^--b--c--d--e--#');
var expected = '---b--c--d--e--#';

expectObservable(e1.share()).toBe(expected);
});

it('should not change the output of the observable when successful with cold observable', function (){
var e1 = cold('---a--b--c--d--e--|');
var expected = '---a--b--c--d--e--|';

expectObservable(e1.share()).toBe(expected);
});

it('should not change the output of the observable when error with cold observable', function (){
var e1 = cold('---a--b--c--d--e--#');
var expected = '---a--b--c--d--e--#';

expectObservable(e1.share()).toBe(expected);
});

it('should not change the output of the observable when never', function (){
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.share()).toBe(expected);
});

it('should not change the output of the observable when empty', function (){
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.share()).toBe(expected);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export interface CoreOperators<T> {
sample?: <T>(notifier: Observable<any>) => Observable<T>;
sampleTime?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
share?: () => Observable<T>;
single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
skip?: (count: number) => Observable<T>;
skipUntil?: (notifier: Observable<any>) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ observableProto.sampleTime = sampleTime;
import scan from './operators/scan';
observableProto.scan = scan;

import share from './operators/share';
observableProto.share = share;

import single from './operators/single';
observableProto.single = single;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ observableProto.sampleTime = sampleTime;
import scan from './operators/scan';
observableProto.scan = scan;

import share from './operators/share';
observableProto.share = share;

import single from './operators/single';
observableProto.single = single;

Expand Down
6 changes: 6 additions & 0 deletions src/operators/share.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Observable from '../Observable';
import publish from './publish';

export default function share<T>() : Observable<T> {
return publish.call(this).refCount();
};