Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export type {
TaskUpdateChunk,
Thread,
ThreadInfo,
ThreadStreamOptions,
ThreadSummary,
WebhookOptions,
WellKnownEmoji,
Expand Down
24 changes: 24 additions & 0 deletions packages/chat/src/thread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,30 @@ describe("ThreadImpl", () => {
expect(mockAdapter.postMessage).not.toHaveBeenCalled();
});

it("should pass stream options via thread.stream()", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello World",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello", " ", "World"]);
await thread.stream(textStream, {
taskDisplayMode: "plan",
stopBlocks: [{ type: "actions" }],
});

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "plan",
stopBlocks: [{ type: "actions" }],
})
);
});

it("should fall back to post+edit when adapter has no native streaming", async () => {
// Ensure no stream method
mockAdapter.stream = undefined;
Expand Down
13 changes: 11 additions & 2 deletions packages/chat/src/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import type {
StreamEvent,
StreamOptions,
Thread,
ThreadStreamOptions,
} from "./types";
import { NotImplementedError, THREAD_STATE_TTL_MS } from "./types";

Expand Down Expand Up @@ -485,18 +486,26 @@ export class ThreadImpl<TState = Record<string, unknown>>
return this.adapter.scheduleMessage(this.id, postable, options);
}

async stream(
stream: AsyncIterable<string | StreamChunk | StreamEvent>,
options?: ThreadStreamOptions
): Promise<SentMessage> {
return this.handleStream(stream, options);
}

/**
* Handle streaming from an AsyncIterable.
* Normalizes the stream (supports both textStream and fullStream from AI SDK),
* then uses adapter's native streaming if available, otherwise falls back to post+edit.
*/
private async handleStream(
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>,
callerOptions?: ThreadStreamOptions
): Promise<SentMessage> {
// Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects
const textStream = fromFullStream(rawStream);
// Build streaming options from current message context
const options: StreamOptions = {};
const options: StreamOptions = { ...callerOptions };
if (this._currentMessage) {
options.recipientUserId = this._currentMessage.author.userId;
// Extract teamId from raw Slack payload
Expand Down
31 changes: 31 additions & 0 deletions packages/chat/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,15 @@ export interface StreamOptions {
updateIntervalMs?: number;
}

/**
* User-facing streaming options for `thread.stream()`.
* Internal fields like `recipientUserId` are auto-populated from message context.
*/
export type ThreadStreamOptions = Pick<
StreamOptions,
"stopBlocks" | "taskDisplayMode" | "updateIntervalMs"
>;

/** Internal interface for Chat instance passed to adapters */
export interface ChatInstance {
/** Get the configured logger, optionally with a child prefix */
Expand Down Expand Up @@ -1093,6 +1102,28 @@ export interface Thread<TState = Record<string, unknown>, TRawMessage = unknown>
*/
startTyping(status?: string): Promise<void>;

/**
* Stream an async iterable to the thread with platform-specific options.
*
* Use this instead of `post()` when you need to pass streaming options
* like `taskDisplayMode` or `stopBlocks`.
*
* @example
* ```typescript
* const result = await agent.stream({ prompt: message.text });
* await thread.stream(result.fullStream, {
* taskDisplayMode: "plan",
* });
* ```
*
* @param stream - Async iterable of text chunks, StreamChunks, or AI SDK stream events
* @param options - Platform-specific streaming options
*/
stream(
stream: AsyncIterable<string | StreamChunk | StreamEvent>,
options?: ThreadStreamOptions
): Promise<SentMessage<TRawMessage>>;

/**
* Subscribe to future messages in this thread.
*
Expand Down
Loading