Skip to content

Commit 7d0fc01

Browse files
committed
fix(extra): move flattenConcurrently from core to extra
Make flattenConcurrently() an extra operator because it is not that often used as flatten() is. Usage is: `import flattenConcurrently from 'xstream/extra/flattenConcurrently'` and then `streamOfStreams.compose(flattenConcurrently)`. BREAKING CHANGE: flattenConcurrently must be separately imported as an extra operator and used with .compose()
1 parent 61838bb commit 7d0fc01

5 files changed

Lines changed: 109 additions & 203 deletions

File tree

src/core.ts

Lines changed: 6 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -549,70 +549,7 @@ export class FilterOperator<T> implements Operator<T, T> {
549549
}
550550
}
551551

552-
class FCIL<T> implements InternalListener<T> {
553-
constructor(private out: Stream<T>,
554-
private op: FlattenConcOperator<T>) {
555-
}
556-
557-
_n(t: T) {
558-
this.out._n(t);
559-
}
560-
561-
_e(err: any) {
562-
this.out._e(err);
563-
}
564-
565-
_c() {
566-
this.op.less();
567-
}
568-
}
569-
570-
export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
571-
public type = 'flattenConcurrently';
572-
private active: number = 1; // number of outers and inners that have not yet ended
573-
private out: Stream<T> = null;
574-
575-
constructor(public ins: Stream<Stream<T>>) {
576-
}
577-
578-
_start(out: Stream<T>): void {
579-
this.out = out;
580-
this.ins._add(this);
581-
}
582-
583-
_stop(): void {
584-
this.ins._remove(this);
585-
this.active = 1;
586-
this.out = null;
587-
}
588-
589-
less(): void {
590-
if (--this.active === 0) {
591-
const u = this.out;
592-
if (!u) return;
593-
u._c();
594-
}
595-
}
596-
597-
_n(s: Stream<T>) {
598-
const u = this.out;
599-
if (!u) return;
600-
this.active++;
601-
s._add(new FCIL(u, this));
602-
}
603-
604-
_e(err: any) {
605-
const u = this.out;
606-
if (!u) return;
607-
u._e(err);
608-
}
609-
610-
_c() {
611-
this.less();
612-
}
613-
}
614-
615-
class FIL<T> implements InternalListener<T> {
552+
class FlattenListener<T> implements InternalListener<T> {
616553
constructor(private out: Stream<T>,
617554
private op: FlattenOperator<T>) {
618555
}
@@ -665,7 +602,7 @@ export class FlattenOperator<T> implements Operator<Stream<T>, T> {
665602
if (!u) return;
666603
const {inner, il} = this;
667604
if (inner && il) inner._remove(il);
668-
(this.inner = s)._add(this.il = new FIL(u, this));
605+
(this.inner = s)._add(this.il = new FlattenListener(u, this));
669606
}
670607

671608
_e(err: any) {
@@ -770,77 +707,7 @@ export class LastOperator<T> implements Operator<T, T> {
770707
}
771708
}
772709

773-
class MFCIL<R> implements InternalListener<R> {
774-
constructor(private out: Stream<R>,
775-
private op: MapFlattenConcOperator<any, R>) {
776-
}
777-
778-
_n(r: R) {
779-
this.out._n(r);
780-
}
781-
782-
_e(err: any) {
783-
this.out._e(err);
784-
}
785-
786-
_c() {
787-
this.op.less();
788-
}
789-
}
790-
791-
export class MapFlattenConcOperator<T, R> implements Operator<T, R> {
792-
public type: string;
793-
public ins: Stream<T>;
794-
private active: number = 1; // number of outers and inners that have not yet ended
795-
private out: Stream<R> = null;
796-
797-
constructor(public mapOp: MapOperator<T, Stream<R>>) {
798-
this.type = `${mapOp.type}+flattenConcurrently`;
799-
this.ins = mapOp.ins;
800-
}
801-
802-
_start(out: Stream<R>): void {
803-
this.out = out;
804-
this.mapOp.ins._add(this);
805-
}
806-
807-
_stop(): void {
808-
this.mapOp.ins._remove(this);
809-
this.active = 1;
810-
this.out = null;
811-
}
812-
813-
less(): void {
814-
if (--this.active === 0) {
815-
const u = this.out;
816-
if (!u) return;
817-
u._c();
818-
}
819-
}
820-
821-
_n(v: T) {
822-
const u = this.out;
823-
if (!u) return;
824-
this.active++;
825-
try {
826-
this.mapOp.project(v)._add(new MFCIL(u, this));
827-
} catch (e) {
828-
u._e(e);
829-
}
830-
}
831-
832-
_e(err: any) {
833-
const u = this.out;
834-
if (!u) return;
835-
u._e(err);
836-
}
837-
838-
_c() {
839-
this.less();
840-
}
841-
}
842-
843-
class MFIL<R> implements InternalListener<R> {
710+
class MapFlattenInner<R> implements InternalListener<R> {
844711
constructor(private out: Stream<R>,
845712
private op: MapFlattenOperator<any, R>) {
846713
}
@@ -899,7 +766,9 @@ export class MapFlattenOperator<T, R> implements Operator<T, R> {
899766
const {inner, il} = this;
900767
if (inner && il) inner._remove(il);
901768
try {
902-
(this.inner = this.mapOp.project(v))._add(this.il = new MFIL(u, this));
769+
(this.inner = this.mapOp.project(v))._add(
770+
this.il = new MapFlattenInner(u, this)
771+
);
903772
} catch (e) {
904773
u._e(e);
905774
}
@@ -1716,40 +1585,6 @@ export class Stream<T> implements InternalListener<T> {
17161585
);
17171586
}
17181587

1719-
/**
1720-
* Flattens a "stream of streams", handling multiple concurrent nested streams
1721-
* simultaneously.
1722-
*
1723-
* If the input stream is a stream that emits streams, then this operator will
1724-
* return an output stream which is a flat stream: emits regular events. The
1725-
* flattening happens concurrently. It works like this: when the input stream
1726-
* emits a nested stream, *flattenConcurrently* will start imitating that
1727-
* nested one. When the next nested stream is emitted on the input stream,
1728-
* *flattenConcurrently* will also imitate that new one, but will continue to
1729-
* imitate the previous nested streams as well.
1730-
*
1731-
* Marble diagram:
1732-
*
1733-
* ```text
1734-
* --+--------+---------------
1735-
* \ \
1736-
* \ ----1----2---3--
1737-
* --a--b----c----d--------
1738-
* flattenConcurrently
1739-
* -----a--b----c-1--d-2---3--
1740-
* ```
1741-
*
1742-
* @return {Stream}
1743-
*/
1744-
flattenConcurrently<R>(): T {
1745-
const p = this._prod;
1746-
return <T> <any> new Stream<R>(
1747-
p instanceof MapOperator && !(p instanceof FilterMapOperator) ?
1748-
new MapFlattenConcOperator(<MapOperator<any, Stream<R>>> <any> p) :
1749-
new FlattenConcOperator(<Stream<Stream<R>>> <any> this)
1750-
);
1751-
}
1752-
17531588
/**
17541589
* Blends two streams together, emitting events from both.
17551590
*

src/extra/flattenConcurrently.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import {Operator, Stream, InternalListener} from '../core';
2+
3+
class FCIL<T> implements InternalListener<T> {
4+
constructor(private out: Stream<T>,
5+
private op: FlattenConcOperator<T>) {
6+
}
7+
8+
_n(t: T) {
9+
this.out._n(t);
10+
}
11+
12+
_e(err: any) {
13+
this.out._e(err);
14+
}
15+
16+
_c() {
17+
this.op.less();
18+
}
19+
}
20+
21+
export class FlattenConcOperator<T> implements Operator<Stream<T>, T> {
22+
public type = 'flattenConcurrently';
23+
private active: number = 1; // number of outers and inners that have not yet ended
24+
private out: Stream<T> = null;
25+
26+
constructor(public ins: Stream<Stream<T>>) {
27+
}
28+
29+
_start(out: Stream<T>): void {
30+
this.out = out;
31+
this.ins._add(this);
32+
}
33+
34+
_stop(): void {
35+
this.ins._remove(this);
36+
this.active = 1;
37+
this.out = null;
38+
}
39+
40+
less(): void {
41+
if (--this.active === 0) {
42+
const u = this.out;
43+
if (!u) return;
44+
u._c();
45+
}
46+
}
47+
48+
_n(s: Stream<T>) {
49+
const u = this.out;
50+
if (!u) return;
51+
this.active++;
52+
s._add(new FCIL(u, this));
53+
}
54+
55+
_e(err: any) {
56+
const u = this.out;
57+
if (!u) return;
58+
u._e(err);
59+
}
60+
61+
_c() {
62+
this.less();
63+
}
64+
}
65+
66+
/**
67+
* Flattens a "stream of streams", handling multiple concurrent nested streams
68+
* simultaneously.
69+
*
70+
* If the input stream is a stream that emits streams, then this operator will
71+
* return an output stream which is a flat stream: emits regular events. The
72+
* flattening happens concurrently. It works like this: when the input stream
73+
* emits a nested stream, *flattenConcurrently* will start imitating that
74+
* nested one. When the next nested stream is emitted on the input stream,
75+
* *flattenConcurrently* will also imitate that new one, but will continue to
76+
* imitate the previous nested streams as well.
77+
*
78+
* Marble diagram:
79+
*
80+
* ```text
81+
* --+--------+---------------
82+
* \ \
83+
* \ ----1----2---3--
84+
* --a--b----c----d--------
85+
* flattenConcurrently
86+
* -----a--b----c-1--d-2---3--
87+
* ```
88+
*
89+
* @return {Stream}
90+
*/
91+
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> {
92+
return new Stream<T>(new FlattenConcOperator(ins));
93+
}

0 commit comments

Comments
 (0)