From b5b192a2904ea1351243fdde4316a6837f721251 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 25 Nov 2015 16:10:12 -0200 Subject: [PATCH 1/2] test(expand): add more marble tests for expand() Add subscription assertions on expand tests. Add concurrency-related tests. --- spec/operators/expand-spec.js | 225 ++++++++++++++++++++++++++++++++-- 1 file changed, 216 insertions(+), 9 deletions(-) diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js index 84b93f4b3f..ec7eed18dc 100644 --- a/spec/operators/expand-spec.js +++ b/spec/operators/expand-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect, expectObservable, hot, cold */ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var Promise = require('promise'); @@ -12,7 +12,10 @@ describe('Observable.prototype.expand()', function () { d: 4 + 4, // c + c, e: 8 + 8, // d + d }; - var e1 = hot('(a|)', values); + var e1 = hot('(a|)', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a--b--c--d--(e|)'; /* expectation explanation: (conjunction junction?) ... @@ -27,14 +30,214 @@ describe('Observable.prototype.expand()', function () { ---(e|) (...which flattens into:) a--b--c--d--(e|) */ - var expected = 'a--b--c--d--(e|)'; - expectObservable(e1.expand(function (x) { + var result = e1.expand(function (x) { if (x === 16) { return Observable.empty(); } - return cold('---(z|)', { z: x + x }); - })).toBe(expected, values); + return cold(e2shape, { z: x + x }); + }); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should map and recursively flatten, and handle event raised error', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + }; + var e1 = hot('(a|)', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a--b--c--(d#)'; + + var result = e1.expand(function (x) { + if (x === 8) { + return cold('#'); + } + return cold(e2shape, { z: x + x }); + }); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should map and recursively flatten, and propagate error thrown from projection', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + }; + var e1 = hot('(a|)', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a--b--c--(d#)'; + + var result = e1.expand(function (x) { + if (x === 8) { + throw 'error'; + } + return cold(e2shape, { z: x + x }); + }); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing early', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + }; + var e1 = hot('(a|)', values); + var unsub = ' ! '; + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a--b--c- '; + + var result = e1.expand(function (x) { + if (x === 16) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }); + + expectObservable(result, unsub).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow concurrent expansions', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + }; + var e1 = hot('a-a| ', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a-ab-bc-cd-de-(e|)'; + + var result = e1.expand(function (x) { + if (x === 16) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow configuring the concurrency limit parameter to 1', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + u: 10, + v: 20, // u + u + x: 40, // v + v + y: 80, // x + x + z: 160, // y + y + }; + var e1 = hot('a-u|', values); + var e2shape = '---(z|)'; + // ---(z|) + // ---(z|) + // ---(z|) + // ---(z|) + // ---(z|) + // ---(z|) + // ---(z|) + // Notice how for each column, there is at most 1 `-` character. + var e1subs = '^ ! '; + var expected = 'a--u--b--v--c--x--d--y--(ez|)'; + var concurrencyLimit = 1; + + var result = e1.expand(function (x) { + if (x === 16 || x === 160) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }, concurrencyLimit); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow configuring the concurrency limit parameter to 2', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + u: 10, + v: 20, // u + u + x: 40, // v + v + }; + var e1 = hot('a---au|', values); + var e2shape = '------(z|)'; + // ------(z|) + // ------(z|) + // ------(z|) + // ------(z|) + // ------(z|) + // ------(z|) + // Notice how for each column, there is at most 2 `-` characters. + var e1subs = '^ ! '; + var expected = 'a---a-u---b-b---v-(cc)(x|)'; + var concurrencyLimit = 2; + + var result = e1.expand(function (x) { + if (x === 4 || x === 40) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }, concurrencyLimit); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should ignore concurrency limit if it is not passed', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + u: 10, + v: 20, // u + u + x: 40, // v + v + y: 80, // x + x + z: 160, // y + y + }; + var e1 = hot('a-u| ', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a-ub-vc-xd-ye-(z|)'; + var concurrencyLimit = 100; + + var result = e1.expand(function (x) { + if (x === 16 || x === 160) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }, concurrencyLimit); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); }); it('should map and recursively flatten with scalars', function () { @@ -45,15 +248,19 @@ describe('Observable.prototype.expand()', function () { d: 4 + 4, // c + c, e: 8 + 8, // d + d }; - var e1 = hot('(a|)', values); + var e1 = hot('(a|)', values); + var e1subs = '(^!)'; var expected = '(abcde|)'; - expectObservable(e1.expand(function (x) { + var result = e1.expand(function (x) { if (x === 16) { return Observable.empty(); } return Observable.of(x + x); // scalar - })).toBe(expected, values); + }); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); }); it('should recursively flatten promises', function (done) { From 61c2ba5289a0b8a5938b3ea1e53935b130453980 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 25 Nov 2015 16:11:38 -0200 Subject: [PATCH 2/2] fix(expand): fix expand's concurrency behavior Fix expand() in order to emit each next value only once. It was previously emitting those values multiple times when a concurrency limit was set. --- src/operators/expand-support.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operators/expand-support.ts b/src/operators/expand-support.ts index ad00cc9548..7b16b1969b 100644 --- a/src/operators/expand-support.ts +++ b/src/operators/expand-support.ts @@ -34,8 +34,8 @@ export class ExpandSubscriber extends OuterSubscriber { _next(value: any): void { const index = this.index++; - this.destination.next(value); if (this.active < this.concurrent) { + this.destination.next(value); let result = tryCatch(this.project)(value, index); if (result === errorObject) { this.destination.error(result.e);