diff --git a/packages/chat/src/index.ts b/packages/chat/src/index.ts index bebb348b..ee5b974f 100644 --- a/packages/chat/src/index.ts +++ b/packages/chat/src/index.ts @@ -335,6 +335,7 @@ export type { TaskUpdateChunk, Thread, ThreadInfo, + ThreadStreamOptions, ThreadSummary, WebhookOptions, WellKnownEmoji, diff --git a/packages/chat/src/thread.test.ts b/packages/chat/src/thread.test.ts index 4031adad..daa1a34f 100644 --- a/packages/chat/src/thread.test.ts +++ b/packages/chat/src/thread.test.ts @@ -213,6 +213,30 @@ describe("ThreadImpl", () => { expect(mockAdapter.postMessage).not.toHaveBeenCalled(); }); + it("should pass stream options via thread.stream()", async () => { + const mockStream = vi.fn().mockResolvedValue({ + id: "msg-stream", + threadId: "t1", + raw: "Hello World", + }); + mockAdapter.stream = mockStream; + + const textStream = createTextStream(["Hello", " ", "World"]); + await thread.stream(textStream, { + taskDisplayMode: "plan", + stopBlocks: [{ type: "actions" }], + }); + + expect(mockStream).toHaveBeenCalledWith( + "slack:C123:1234.5678", + expect.any(Object), + expect.objectContaining({ + taskDisplayMode: "plan", + stopBlocks: [{ type: "actions" }], + }) + ); + }); + it("should fall back to post+edit when adapter has no native streaming", async () => { // Ensure no stream method mockAdapter.stream = undefined; diff --git a/packages/chat/src/thread.ts b/packages/chat/src/thread.ts index eb6fa285..7e622472 100644 --- a/packages/chat/src/thread.ts +++ b/packages/chat/src/thread.ts @@ -33,6 +33,7 @@ import type { StreamEvent, StreamOptions, Thread, + ThreadStreamOptions, } from "./types"; import { NotImplementedError, THREAD_STATE_TTL_MS } from "./types"; @@ -485,18 +486,26 @@ export class ThreadImpl> return this.adapter.scheduleMessage(this.id, postable, options); } + async stream( + stream: AsyncIterable, + options?: ThreadStreamOptions + ): Promise { + return this.handleStream(stream, options); + } + /** * Handle streaming from an AsyncIterable. * Normalizes the stream (supports both textStream and fullStream from AI SDK), * then uses adapter's native streaming if available, otherwise falls back to post+edit. */ private async handleStream( - rawStream: AsyncIterable + rawStream: AsyncIterable, + callerOptions?: ThreadStreamOptions ): Promise { // Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects const textStream = fromFullStream(rawStream); // Build streaming options from current message context - const options: StreamOptions = {}; + const options: StreamOptions = { ...callerOptions }; if (this._currentMessage) { options.recipientUserId = this._currentMessage.author.userId; // Extract teamId from raw Slack payload diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts index 57676776..eaf3337c 100644 --- a/packages/chat/src/types.ts +++ b/packages/chat/src/types.ts @@ -513,6 +513,15 @@ export interface StreamOptions { updateIntervalMs?: number; } +/** + * User-facing streaming options for `thread.stream()`. + * Internal fields like `recipientUserId` are auto-populated from message context. + */ +export type ThreadStreamOptions = Pick< + StreamOptions, + "stopBlocks" | "taskDisplayMode" | "updateIntervalMs" +>; + /** Internal interface for Chat instance passed to adapters */ export interface ChatInstance { /** Get the configured logger, optionally with a child prefix */ @@ -1093,6 +1102,28 @@ export interface Thread, TRawMessage = unknown> */ startTyping(status?: string): Promise; + /** + * Stream an async iterable to the thread with platform-specific options. + * + * Use this instead of `post()` when you need to pass streaming options + * like `taskDisplayMode` or `stopBlocks`. + * + * @example + * ```typescript + * const result = await agent.stream({ prompt: message.text }); + * await thread.stream(result.fullStream, { + * taskDisplayMode: "plan", + * }); + * ``` + * + * @param stream - Async iterable of text chunks, StreamChunks, or AI SDK stream events + * @param options - Platform-specific streaming options + */ + stream( + stream: AsyncIterable, + options?: ThreadStreamOptions + ): Promise>; + /** * Subscribe to future messages in this thread. *