Skip to content

Commit b831175

Browse files
Streamable HTTP resumability + redelivery + SSE polling via server-side disconnect (#1077)
Co-authored-by: Stephen Halter <halter73@gmail.com>
1 parent 4cb2fd4 commit b831175

39 files changed

+2675
-246
lines changed

.github/workflows/ci-build-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
node-version: '20'
6363

6464
- name: 📦 Install dependencies for tests
65-
run: npm install @modelcontextprotocol/server-everything
65+
run: npm install @modelcontextprotocol/server-everything@2025.12.18
6666

6767
- name: 📦 Install dependencies for tests
6868
run: npm install @modelcontextprotocol/server-memory
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
// Copied from https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/Common/src/System/Net/ArrayBuffer.cs
5+
6+
using System.Buffers;
7+
using System.Diagnostics;
8+
using System.Runtime.CompilerServices;
9+
using System.Runtime.InteropServices;
10+
11+
namespace System.Net.ServerSentEvents;
12+
13+
// Warning: Mutable struct!
14+
// The purpose of this struct is to simplify buffer management.
15+
// It manages a sliding buffer where bytes can be added at the end and removed at the beginning.
16+
// [ActiveSpan/Memory] contains the current buffer contents; these bytes will be preserved
17+
// (copied, if necessary) on any call to EnsureAvailableBytes.
18+
// [AvailableSpan/Memory] contains the available bytes past the end of the current content,
19+
// and can be written to in order to add data to the end of the buffer.
20+
// Commit(byteCount) will extend the ActiveSpan by [byteCount] bytes into the AvailableSpan.
21+
// Discard(byteCount) will discard [byteCount] bytes as the beginning of the ActiveSpan.
22+
23+
[StructLayout(LayoutKind.Auto)]
24+
internal struct ArrayBuffer : IDisposable
25+
{
26+
#if NET
27+
private static int ArrayMaxLength => Array.MaxLength;
28+
#else
29+
private const int ArrayMaxLength = 0X7FFFFFC7;
30+
#endif
31+
32+
private readonly bool _usePool;
33+
private byte[] _bytes;
34+
private int _activeStart;
35+
private int _availableStart;
36+
37+
// Invariants:
38+
// 0 <= _activeStart <= _availableStart <= bytes.Length
39+
40+
public ArrayBuffer(int initialSize, bool usePool = false)
41+
{
42+
Debug.Assert(initialSize > 0 || usePool);
43+
44+
_usePool = usePool;
45+
_bytes = initialSize == 0
46+
? Array.Empty<byte>()
47+
: usePool ? ArrayPool<byte>.Shared.Rent(initialSize) : new byte[initialSize];
48+
_activeStart = 0;
49+
_availableStart = 0;
50+
}
51+
52+
public ArrayBuffer(byte[] buffer)
53+
{
54+
Debug.Assert(buffer.Length > 0);
55+
56+
_usePool = false;
57+
_bytes = buffer;
58+
_activeStart = 0;
59+
_availableStart = 0;
60+
}
61+
62+
public void Dispose()
63+
{
64+
_activeStart = 0;
65+
_availableStart = 0;
66+
67+
byte[] array = _bytes;
68+
_bytes = null!;
69+
70+
if (array is not null)
71+
{
72+
ReturnBufferIfPooled(array);
73+
}
74+
}
75+
76+
// This is different from Dispose as the instance remains usable afterwards (_bytes will not be null).
77+
public void ClearAndReturnBuffer()
78+
{
79+
Debug.Assert(_usePool);
80+
Debug.Assert(_bytes is not null);
81+
82+
_activeStart = 0;
83+
_availableStart = 0;
84+
85+
byte[] bufferToReturn = _bytes!;
86+
_bytes = Array.Empty<byte>();
87+
ReturnBufferIfPooled(bufferToReturn);
88+
}
89+
90+
public int ActiveLength => _availableStart - _activeStart;
91+
public Span<byte> ActiveSpan => new Span<byte>(_bytes, _activeStart, _availableStart - _activeStart);
92+
public ReadOnlySpan<byte> ActiveReadOnlySpan => new ReadOnlySpan<byte>(_bytes, _activeStart, _availableStart - _activeStart);
93+
public Memory<byte> ActiveMemory => new Memory<byte>(_bytes, _activeStart, _availableStart - _activeStart);
94+
95+
public int AvailableLength => _bytes.Length - _availableStart;
96+
public Span<byte> AvailableSpan => _bytes.AsSpan(_availableStart);
97+
public Memory<byte> AvailableMemory => _bytes.AsMemory(_availableStart);
98+
public Memory<byte> AvailableMemorySliced(int length) => new Memory<byte>(_bytes, _availableStart, length);
99+
100+
public int Capacity => _bytes.Length;
101+
public int ActiveStartOffset => _activeStart;
102+
103+
public byte[] DangerousGetUnderlyingBuffer() => _bytes;
104+
105+
public void Discard(int byteCount)
106+
{
107+
Debug.Assert(byteCount <= ActiveLength, $"Expected {byteCount} <= {ActiveLength}");
108+
_activeStart += byteCount;
109+
110+
if (_activeStart == _availableStart)
111+
{
112+
_activeStart = 0;
113+
_availableStart = 0;
114+
}
115+
}
116+
117+
public void Commit(int byteCount)
118+
{
119+
Debug.Assert(byteCount <= AvailableLength);
120+
_availableStart += byteCount;
121+
}
122+
123+
// Ensure at least [byteCount] bytes to write to.
124+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
125+
public void EnsureAvailableSpace(int byteCount)
126+
{
127+
if (byteCount > AvailableLength)
128+
{
129+
EnsureAvailableSpaceCore(byteCount);
130+
}
131+
}
132+
133+
private void EnsureAvailableSpaceCore(int byteCount)
134+
{
135+
Debug.Assert(AvailableLength < byteCount);
136+
137+
if (_bytes.Length == 0)
138+
{
139+
Debug.Assert(_usePool && _activeStart == 0 && _availableStart == 0);
140+
_bytes = ArrayPool<byte>.Shared.Rent(byteCount);
141+
return;
142+
}
143+
144+
int totalFree = _activeStart + AvailableLength;
145+
if (byteCount <= totalFree)
146+
{
147+
// We can free up enough space by just shifting the bytes down, so do so.
148+
Buffer.BlockCopy(_bytes, _activeStart, _bytes, 0, ActiveLength);
149+
_availableStart = ActiveLength;
150+
_activeStart = 0;
151+
Debug.Assert(byteCount <= AvailableLength);
152+
return;
153+
}
154+
155+
int desiredSize = ActiveLength + byteCount;
156+
157+
if ((uint)desiredSize > ArrayMaxLength)
158+
{
159+
throw new OutOfMemoryException();
160+
}
161+
162+
// Double the existing buffer size (capped at Array.MaxLength).
163+
int newSize = Math.Max(desiredSize, (int)Math.Min(ArrayMaxLength, 2 * (uint)_bytes.Length));
164+
165+
byte[] newBytes = _usePool ?
166+
ArrayPool<byte>.Shared.Rent(newSize) :
167+
new byte[newSize];
168+
byte[] oldBytes = _bytes;
169+
170+
if (ActiveLength != 0)
171+
{
172+
Buffer.BlockCopy(oldBytes, _activeStart, newBytes, 0, ActiveLength);
173+
}
174+
175+
_availableStart = ActiveLength;
176+
_activeStart = 0;
177+
178+
_bytes = newBytes;
179+
ReturnBufferIfPooled(oldBytes);
180+
181+
Debug.Assert(byteCount <= AvailableLength);
182+
}
183+
184+
public void Grow()
185+
{
186+
EnsureAvailableSpaceCore(AvailableLength + 1);
187+
}
188+
189+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
190+
private void ReturnBufferIfPooled(byte[] buffer)
191+
{
192+
// The buffer may be Array.Empty<byte>()
193+
if (_usePool && buffer.Length > 0)
194+
{
195+
ArrayPool<byte>.Shared.Return(buffer);
196+
}
197+
}
198+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
// Copied from https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/PooledByteBufferWriter.cs
5+
6+
using System.Buffers;
7+
using System.Diagnostics;
8+
9+
namespace System.Net.ServerSentEvents;
10+
11+
internal sealed class PooledByteBufferWriter : IBufferWriter<byte>, IDisposable
12+
{
13+
private const int MinimumBufferSize = 256;
14+
private ArrayBuffer _buffer = new(initialSize: 256, usePool: true);
15+
16+
public void Advance(int count) => _buffer.Commit(count);
17+
18+
public Memory<byte> GetMemory(int sizeHint = 0)
19+
{
20+
_buffer.EnsureAvailableSpace(Math.Max(sizeHint, MinimumBufferSize));
21+
return _buffer.AvailableMemory;
22+
}
23+
24+
public Span<byte> GetSpan(int sizeHint = 0)
25+
{
26+
_buffer.EnsureAvailableSpace(Math.Max(sizeHint, MinimumBufferSize));
27+
return _buffer.AvailableSpan;
28+
}
29+
30+
public ReadOnlyMemory<byte> WrittenMemory => _buffer.ActiveMemory;
31+
public int Capacity => _buffer.Capacity;
32+
public int WrittenCount => _buffer.ActiveLength;
33+
public void Reset() => _buffer.Discard(_buffer.ActiveLength);
34+
public void Dispose() => _buffer.Dispose();
35+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
// Based on https://github.com/dotnet/runtime/blob/dcbf3413c5f7ae431a68fd0d3f09af095b525887/src/libraries/System.Net.ServerSentEvents/src/System/Net/ServerSentEvents/SseFormatter.cs
5+
6+
using System.Buffers;
7+
using System.Diagnostics;
8+
using System.IO;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
12+
namespace System.Net.ServerSentEvents;
13+
14+
/// <summary>
15+
/// Provides methods for writing SSE events to a stream.
16+
/// </summary>
17+
internal sealed class SseEventWriter : IDisposable
18+
{
19+
private static readonly byte[] s_newLine = "\n"u8.ToArray();
20+
21+
private readonly Stream _destination;
22+
private readonly PooledByteBufferWriter _bufferWriter = new();
23+
private readonly PooledByteBufferWriter _userDataBufferWriter = new();
24+
25+
/// <summary>
26+
/// Initializes a new instance of the <see cref="SseEventWriter"/> class with the specified destination stream and item formatter.
27+
/// </summary>
28+
/// <param name="destination">The stream to write SSE events to.</param>
29+
/// <exception cref="ArgumentNullException"><paramref name="destination"/> is <see langword="null"/>.</exception>
30+
public SseEventWriter(Stream destination)
31+
{
32+
_destination = destination ?? throw new ArgumentNullException(nameof(destination));
33+
}
34+
35+
/// <summary>
36+
/// Writes an SSE item to the destination stream.
37+
/// </summary>
38+
/// <param name="item">The SSE item to write.</param>
39+
/// <param name="itemFormatter"></param>
40+
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
41+
/// <returns>A task representing the asynchronous write operation.</returns>
42+
public async ValueTask WriteAsync<T>(SseItem<T> item, Action<SseItem<T>, IBufferWriter<byte>> itemFormatter, CancellationToken cancellationToken = default)
43+
{
44+
itemFormatter(item, _userDataBufferWriter);
45+
46+
FormatSseEvent(
47+
_bufferWriter,
48+
eventType: item.EventType,
49+
data: _userDataBufferWriter.WrittenMemory.Span,
50+
eventId: item.EventId,
51+
reconnectionInterval: item.ReconnectionInterval);
52+
53+
await _destination.WriteAsync(_bufferWriter.WrittenMemory, cancellationToken).ConfigureAwait(false);
54+
await _destination.FlushAsync(cancellationToken).ConfigureAwait(false);
55+
56+
_userDataBufferWriter.Reset();
57+
_bufferWriter.Reset();
58+
}
59+
60+
private static void FormatSseEvent(
61+
IBufferWriter<byte> bufferWriter,
62+
string? eventType,
63+
ReadOnlySpan<byte> data,
64+
string? eventId,
65+
TimeSpan? reconnectionInterval)
66+
{
67+
if (eventType is not null)
68+
{
69+
Debug.Assert(!eventType.ContainsLineBreaks());
70+
71+
bufferWriter.WriteUtf8String("event: "u8);
72+
bufferWriter.WriteUtf8String(eventType);
73+
bufferWriter.WriteUtf8String(s_newLine);
74+
}
75+
76+
WriteLinesWithPrefix(bufferWriter, prefix: "data: "u8, data);
77+
bufferWriter.Write(s_newLine);
78+
79+
if (eventId is not null)
80+
{
81+
Debug.Assert(!eventId.ContainsLineBreaks());
82+
83+
bufferWriter.WriteUtf8String("id: "u8);
84+
bufferWriter.WriteUtf8String(eventId);
85+
bufferWriter.WriteUtf8String(s_newLine);
86+
}
87+
88+
if (reconnectionInterval is { } retry)
89+
{
90+
Debug.Assert(retry >= TimeSpan.Zero);
91+
92+
bufferWriter.WriteUtf8String("retry: "u8);
93+
bufferWriter.WriteUtf8Number((long)retry.TotalMilliseconds);
94+
bufferWriter.WriteUtf8String(s_newLine);
95+
}
96+
97+
bufferWriter.WriteUtf8String(s_newLine);
98+
}
99+
100+
private static void WriteLinesWithPrefix(IBufferWriter<byte> writer, ReadOnlySpan<byte> prefix, ReadOnlySpan<byte> data)
101+
{
102+
// Writes a potentially multi-line string, prefixing each line with the given prefix.
103+
// Both \n and \r\n sequences are normalized to \n.
104+
105+
while (true)
106+
{
107+
writer.WriteUtf8String(prefix);
108+
109+
int i = data.IndexOfAny((byte)'\r', (byte)'\n');
110+
if (i < 0)
111+
{
112+
writer.WriteUtf8String(data);
113+
return;
114+
}
115+
116+
int lineLength = i;
117+
if (data[i++] == '\r' && i < data.Length && data[i] == '\n')
118+
{
119+
i++;
120+
}
121+
122+
ReadOnlySpan<byte> nextLine = data.Slice(0, lineLength);
123+
data = data.Slice(i);
124+
125+
writer.WriteUtf8String(nextLine);
126+
writer.WriteUtf8String(s_newLine);
127+
}
128+
}
129+
130+
/// <inheritdoc/>
131+
public void Dispose()
132+
{
133+
_bufferWriter.Dispose();
134+
_userDataBufferWriter.Dispose();
135+
}
136+
}

0 commit comments

Comments
 (0)