Skip to content

Commit 97b2ca2

Browse files
authored
fix(takeUntil): complete iterable immediately (#315) (#316)
Don't wait for a value to be emitted before performing iterable completeness check
1 parent 19915b5 commit 97b2ca2

2 files changed

Lines changed: 20 additions & 7 deletions

File tree

spec/asynciterable-operators/takeuntil-spec.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
22
import { takeUntil } from 'ix/asynciterable/operators';
33
import { as } from 'ix/asynciterable';
4+
import { AsyncSink } from 'ix/asynciterable';
45

56
test('AsyncIterable#takeUntil hits', async () => {
6-
const xs = async function*() {
7+
const xs = async function* () {
78
yield await delayValue(1, 100);
89
yield await delayValue(2, 300);
910
yield await delayValue(3, 1200);
@@ -17,7 +18,7 @@ test('AsyncIterable#takeUntil hits', async () => {
1718
});
1819

1920
test('AsyncIterable#takeUntil misses', async () => {
20-
const xs = async function*() {
21+
const xs = async function* () {
2122
yield await delayValue(1, 100);
2223
yield await delayValue(2, 300);
2324
yield await delayValue(3, 600);
@@ -30,3 +31,11 @@ test('AsyncIterable#takeUntil misses', async () => {
3031
await hasNext(it, 3);
3132
await noNext(it);
3233
});
34+
35+
test('AsyncIterable#takeUntil completes immediately', async () => {
36+
const sink = new AsyncSink<void>();
37+
const ys = as(sink).pipe(takeUntil(() => Promise.resolve()));
38+
39+
const it = ys[Symbol.asyncIterator]();
40+
await noNext(it);
41+
});

src/asynciterable/operators/takeuntil.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
33
import { wrapWithAbort } from './withabort';
44
import { throwIfAborted } from '../../aborterror';
55

6+
const DONE_PROMISE_VALUE = undefined;
7+
68
export class TakeUntilAsyncIterable<TSource> extends AsyncIterableX<TSource> {
79
private _source: AsyncIterable<TSource>;
810
private _other: (signal?: AbortSignal) => Promise<any>;
@@ -15,13 +17,15 @@ export class TakeUntilAsyncIterable<TSource> extends AsyncIterableX<TSource> {
1517

1618
async *[Symbol.asyncIterator](signal?: AbortSignal) {
1719
throwIfAborted(signal);
18-
let otherDone = false;
19-
this._other(signal).then(() => (otherDone = true));
20-
for await (const item of wrapWithAbort(this._source, signal)) {
21-
if (otherDone) {
20+
const donePromise = this._other(signal).then(() => DONE_PROMISE_VALUE);
21+
const itemsAsyncIterator = wrapWithAbort(this._source, signal)[Symbol.asyncIterator]();
22+
for (;;) {
23+
const itemPromise = itemsAsyncIterator.next();
24+
const result = await Promise.race([donePromise, itemPromise]);
25+
if (result === DONE_PROMISE_VALUE || result.done) {
2226
break;
2327
}
24-
yield item;
28+
yield result.value;
2529
}
2630
}
2731
}

0 commit comments

Comments
 (0)