77using System . Linq ;
88using System . Net ;
99using System . Net . Http ;
10+ using System . Runtime . CompilerServices ;
1011using System . Text ;
1112using System . Text . Json ;
1213using System . Threading ;
1314using System . Threading . Tasks ;
14- using System . Runtime . CompilerServices ;
1515using Microsoft . KernelMemory . Context ;
16+ using Microsoft . KernelMemory . HTTP ;
1617using Microsoft . KernelMemory . Internals ;
1718
1819namespace Microsoft . KernelMemory ;
@@ -338,28 +339,30 @@ public async Task<SearchResult> SearchAsync(
338339 }
339340
340341 /// <inheritdoc />
341- public async Task < MemoryAnswer > AskAsync (
342+ public async IAsyncEnumerable < MemoryAnswer > AskStreamingAsync (
342343 string question ,
343344 string ? index = null ,
344345 MemoryFilter ? filter = null ,
345346 ICollection < MemoryFilter > ? filters = null ,
346347 double minRelevance = 0 ,
348+ SearchOptions ? options = null ,
347349 IContext ? context = null ,
348- CancellationToken cancellationToken = default )
350+ [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
349351 {
350352 if ( filter != null )
351353 {
352- if ( filters == null ) { filters = [ ] ; }
353-
354+ filters ??= [ ] ;
354355 filters . Add ( filter ) ;
355356 }
356357
358+ var useStreaming = options ? . Stream ?? false ;
357359 MemoryQuery request = new ( )
358360 {
359361 Index = index ,
360362 Question = question ,
361363 Filters = ( filters is { Count : > 0 } ) ? filters . ToList ( ) : [ ] ,
362364 MinRelevance = minRelevance ,
365+ Stream = useStreaming ,
363366 ContextArguments = ( context ? . Arguments ?? new Dictionary < string , object ? > ( ) ) . ToDictionary ( ) ,
364367 } ;
365368 using StringContent content = new ( JsonSerializer . Serialize ( request ) , Encoding . UTF8 , "application/json" ) ;
@@ -368,62 +371,20 @@ public async Task<MemoryAnswer> AskAsync(
368371 HttpResponseMessage response = await this . _client . PostAsync ( url , content , cancellationToken ) . ConfigureAwait ( false ) ;
369372 response . EnsureSuccessStatusCode ( ) ;
370373
371- var json = await response . Content . ReadAsStringAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
372- return JsonSerializer . Deserialize < MemoryAnswer > ( json , s_caseInsensitiveJsonOptions ) ?? new MemoryAnswer ( ) ;
373- }
374-
375- /// <inheritdoc />
376- public async IAsyncEnumerable < MemoryAnswer > AskAsyncChunk (
377- string question ,
378- string ? index = null ,
379- MemoryFilter ? filter = null ,
380- ICollection < MemoryFilter > ? filters = null ,
381- double minRelevance = 0 ,
382- IContext ? context = null ,
383- [ EnumeratorCancellation ] CancellationToken cancellationToken = default )
384- {
385- if ( filter != null )
374+ if ( useStreaming )
386375 {
387- if ( filters == null ) { filters = new List < MemoryFilter > ( ) ; }
388-
389- filters . Add ( filter ) ;
390- }
391-
392- MemoryQuery request = new ( )
393- {
394- Index = index ,
395- Question = question ,
396- Filters = ( filters is { Count : > 0 } ) ? filters . ToList ( ) : new ( ) ,
397- MinRelevance = minRelevance ,
398- ContextArguments = ( context ? . Arguments ?? new Dictionary < string , object ? > ( ) ) . ToDictionary ( ) ,
399- } ;
400- using StringContent content = new ( JsonSerializer . Serialize ( request ) , Encoding . UTF8 , "application/json" ) ;
401-
402- var url = Constants . HttpAskChunkEndpoint . CleanUrlPath ( ) ;
403- HttpResponseMessage response = await this . _client . PostAsync ( url , content , cancellationToken ) . ConfigureAwait ( false ) ;
404- response . EnsureSuccessStatusCode ( ) ;
405- var stream = await response . Content . ReadAsStreamAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
406- var eosToken = context . GetCustomEosTokenOrDefault ( "#DONE#" ) ;
407- using ( var reader = new StreamReader ( stream , Encoding . UTF8 ) )
408- {
409- string ? line ;
410- while ( ( line = await reader . ReadLineAsync ( cancellationToken ) . ConfigureAwait ( false ) ) != null )
376+ Stream stream = await response . Content . ReadAsStreamAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
377+ IAsyncEnumerable < MemoryAnswer > answers = SSE . ParseStreamAsync < MemoryAnswer > ( stream , cancellationToken ) ;
378+ await foreach ( MemoryAnswer answer in answers . ConfigureAwait ( false ) )
411379 {
412- if ( line . StartsWith ( "data:" , StringComparison . Ordinal ) )
413- {
414- var jsonData = line . Substring ( 6 ) ;
415- var chunk = JsonSerializer . Deserialize < MemoryAnswer > ( jsonData ) ;
416- if ( chunk != null )
417- {
418- yield return chunk ;
419- if ( chunk . Result == eosToken )
420- {
421- break ;
422- }
423- }
424- }
380+ yield return answer ;
425381 }
426382 }
383+ else
384+ {
385+ var json = await response . Content . ReadAsStringAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
386+ yield return JsonSerializer . Deserialize < MemoryAnswer > ( json , s_caseInsensitiveJsonOptions ) ?? new MemoryAnswer ( ) ;
387+ }
427388 }
428389
429390 #region private
0 commit comments