Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions wasm-wasi-core/src/common/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { WasmRootFileSystemImpl } from './fileSystem';
import WasiKernel from './kernel';
import { MemoryFileSystem as MemoryFileSystemImpl } from './memoryFileSystemDriver';
import { WasiProcess as InternalWasiProcess } from './process';
import { ReadableStream, WritableStream } from './streams';
import { ReadableStream, WritableStream, WritableStreamEOT } from './streams';
import { WasmPseudoterminalImpl } from './terminal';

export interface Environment {
Expand Down Expand Up @@ -495,6 +495,12 @@ export interface Wasm {
*/
createWritable(encoding?: 'utf-8'): Writable;

/**
* Creates a new writable stream. If EOT is enabled the stream will
* close if the EOT character is written to the stream.
*/
createWritable(options: { eot?: boolean; encoding?: 'utf-8' }): Writable;

/**
* Creates a new WASM process.
*
Expand Down Expand Up @@ -571,8 +577,27 @@ namespace WasiCoreImpl {
createReadable() {
return new ReadableStream();
},
createWritable(encoding?: 'utf-8') {
return new WritableStream(encoding);
createWritable(optionsOrEncoding?: 'utf-8' | { eot?: boolean; encoding?: 'utf-8' }): Writable {
if (optionsOrEncoding === undefined) {
return new WritableStream();
}
let ctor: new (encoding?: 'utf-8') => Writable = WritableStream;
let encoding: 'utf-8' | undefined = undefined;
if (typeof optionsOrEncoding === 'string') {
if (optionsOrEncoding !== 'utf-8') {
throw new Error(`Unsupported encoding: ${optionsOrEncoding}`);
}
encoding = optionsOrEncoding;
} else {
if (optionsOrEncoding.encoding !== undefined && optionsOrEncoding.encoding !== 'utf-8') {
throw new Error(`Unsupported encoding: ${optionsOrEncoding.encoding}`);
}
encoding = optionsOrEncoding.encoding;
if (optionsOrEncoding.eot) {
ctor = WritableStreamEOT;
}
}
return new ctor(encoding);
},
async createProcess(name: string, module: WebAssembly.Module | Promise<WebAssembly.Module>, memoryOrOptions?: WebAssembly.MemoryDescriptor | WebAssembly.Memory | ProcessOptions, optionsOrMapWorkspaceFolders?: ProcessOptions | boolean): Promise<WasmProcess> {
let memory: WebAssembly.Memory | WebAssembly.MemoryDescriptor | undefined;
Expand Down
2 changes: 1 addition & 1 deletion wasm-wasi-core/src/common/ral.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface _TextEncoder {
}

interface _TextDecoder {
decode(input?: Uint8Array): string;
decode(input?: Uint8Array, options?: { stream?: boolean | undefined }): string;
}

interface RAL {
Expand Down
73 changes: 72 additions & 1 deletion wasm-wasi-core/src/common/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ export abstract class Stream {
}

public async write(chunk: Uint8Array): Promise<void> {
// We don't write chunks of size 0 since they would indicate the end of the stream
// on the receiving side. If we want to support closing a stream via a write we should
// support writing null.
if (chunk.byteLength === 0) {
return;
}
// We have enough space
if (this.fillLevel + chunk.byteLength <= Stream.BufferSize) {
this.chunks.push(chunk);
Expand Down Expand Up @@ -178,22 +184,85 @@ export abstract class Stream {

}

enum StreamState {
open = 'open',
closed = 'closed',
}

export class WritableStream extends Stream implements Writable {

private readonly encoding: 'utf-8';
private readonly encoder: RAL.TextEncoder;
protected streamState: StreamState;

constructor(encoding?: 'utf-8') {
super();
this.encoding = encoding ?? 'utf-8';
this.encoder = RAL().TextEncoder.create(this.encoding);
this.streamState = StreamState.open;
}

public async write(chunk: Uint8Array | string): Promise<void> {
public write(chunk: Uint8Array): Promise<void>;
public write(chunk: string, encoding?: 'utf-8'): Promise<void>;
public write(chunk: Uint8Array | string, _encoding?: 'utf-8'): Promise<void> {
if (this.streamState !== StreamState.open) {
return Promise.reject(new Error('Stream is closed'));
}
return super.write(typeof chunk === 'string' ? this.encoder.encode(chunk) : chunk);
}

public read(): Promise<Uint8Array>;
public read(mode: 'max', size: number): Promise<Uint8Array>;
public read(mode?: 'max', size?: number): Promise<Uint8Array> {
if (this.streamState === StreamState.closed && this.chunks.length === 0) {
return Promise.resolve(new Uint8Array(0));
}
return mode !== undefined ? super.read(mode, size!) : super.read();
}

public end(): void {
this.streamState = StreamState.closed;
}
}

export class WritableStreamEOT extends WritableStream {

constructor(encoding?: 'utf-8') {
super(encoding);
}

public async write(chunk: Uint8Array | string, encoding?: 'utf-8'): Promise<void> {
if (this.streamState !== StreamState.open) {
throw new Error('Stream is closed');
}
let eot:boolean = false;
try {
if (typeof chunk === 'string') {
if (chunk.length > 0 && chunk.charCodeAt(chunk.length - 1) === 0x04 /* EOT */) {
eot = true;
if (chunk.length >= 1) {
chunk = chunk.substring(0, chunk.length - 1);
await super.write(chunk, encoding);
}
} else {
await super.write(chunk, encoding);
}
} else {
if (chunk.length > 0 && chunk[chunk.length - 1] === 0x04 /* EOT */) {
eot = true;
if (chunk.length >= 1) {
chunk = chunk.subarray(0, chunk.length - 1);
await super.write(chunk);
}
} else {
await super.write(chunk);
}
}
} finally {
if (eot) {
this.streamState = StreamState.closed;
}
}
}
}

Expand Down Expand Up @@ -248,6 +317,8 @@ export class ReadableStream extends Stream implements Readable {
}
}

public async read(): Promise<Uint8Array>;
public async read(mode: 'max', size: number): Promise<Uint8Array>;
public async read(mode?: 'max', size?: number): Promise<Uint8Array> {
if (this.mode === ReadableStreamMode.flowing) {
throw new Error('Cannot read from stream in flowing mode');
Expand Down
51 changes: 50 additions & 1 deletion wasm-wasi-core/src/common/test/stream.main.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*--------------------------------------------------------------------------------------------*/
import assert from 'assert';
import RAL from '../ral';
import { ReadableStream, WritableStream } from '../streams';
import { ReadableStream, WritableStream, WritableStreamEOT } from '../streams';

suite('Stream Tests', () => {
test('Stream write 16k', async () => {
Expand Down Expand Up @@ -85,4 +85,53 @@ suite('Stream Tests', () => {
assert.strictEqual(data[i], success[i]);
}
});

test('Stream end', async () => {
const writeable = new WritableStream();
await writeable.write('Hello World');
const decoder = RAL().TextDecoder.create();
assert.strictEqual(decoder.decode(await writeable.read()), 'Hello World');
writeable.end();
const data = await writeable.read();
assert.strictEqual(data.byteLength, 0);
});

test('Stream end write throws', async () => {
const writeable = new WritableStream();
writeable.end();
await assert.rejects(async () => {
await writeable.write('Hello World');
});
});
});

suite('Stream EOT Tests', () => {
test('Stream write EOT after read', async () => {
const writeable = new WritableStreamEOT();
await writeable.write('Hello World');
const decoder = RAL().TextDecoder.create();
assert.strictEqual(decoder.decode(await writeable.read()), 'Hello World');
await writeable.write(new Uint8Array([0x04]));
const data = await writeable.read();
assert.strictEqual(data.byteLength, 0);
});

test('Stream write EOT before read', async () => {
const writeable = new WritableStreamEOT();
await writeable.write('Hello World');
await writeable.write(new Uint8Array([0x04]));
const decoder = RAL().TextDecoder.create();
assert.strictEqual(decoder.decode(await writeable.read()), 'Hello World');
const data = await writeable.read();
assert.strictEqual(data.byteLength, 0);
});

test('Stream write EOT embedded', async () => {
const writeable = new WritableStreamEOT();
await writeable.write('Hello World\u0004');
const decoder = RAL().TextDecoder.create();
assert.strictEqual(decoder.decode(await writeable.read()), 'Hello World');
const data = await writeable.read();
assert.strictEqual(data.byteLength, 0);
});
});
6 changes: 6 additions & 0 deletions wasm-wasi/src/api/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@ export interface Wasm {
*/
createWritable(encoding?: 'utf-8'): Writable;

/**
* Creates a new writable stream. If EOT is enabled the stream will
* close if the EOT character is written to the stream.
*/
createWritable(options: { eot?: boolean; encoding?: 'utf-8' }): Writable;

/**
* Creates a new WASM process.
*
Expand Down