diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs index 5c4ba44ee383d4..55ba3ef0a2496f 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs @@ -1,14 +1,12 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System.Buffers; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Runtime.CompilerServices; using System.Text.Json.Serialization; -using System.Text.Json.Serialization.Converters; using System.Text.Json.Serialization.Metadata; using System.Threading; using System.Threading.Tasks; @@ -375,7 +373,7 @@ public static partial class JsonSerializer } JsonTypeInfo jsonTypeInfo = options.GetOrAddJsonTypeInfoForRootType(typeof(TValue)); - return CreateAsyncEnumerableDeserializer(utf8Json, jsonTypeInfo, cancellationToken); + return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo(jsonTypeInfo), cancellationToken); } /// @@ -406,28 +404,30 @@ public static partial class JsonSerializer ThrowHelper.ThrowArgumentNullException(nameof(jsonTypeInfo)); } - return CreateAsyncEnumerableDeserializer(utf8Json, jsonTypeInfo, cancellationToken); + return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo(jsonTypeInfo), cancellationToken); + } + + private static JsonTypeInfo> CreateQueueTypeInfo(JsonTypeInfo jsonTypeInfo) + { + return JsonMetadataServices.CreateQueueInfo, TValue>( + options: jsonTypeInfo.Options, + collectionInfo: new() + { + ObjectCreator = static () => new Queue(), + ElementInfo = jsonTypeInfo, + NumberHandling = jsonTypeInfo.Options.NumberHandling + }); } private static async IAsyncEnumerable CreateAsyncEnumerableDeserializer( Stream utf8Json, - JsonTypeInfo jsonTypeInfo, + JsonTypeInfo> queueTypeInfo, [EnumeratorCancellation] CancellationToken cancellationToken) { - JsonSerializerOptions options = jsonTypeInfo.Options; - JsonTypeInfo> queueTypeInfo = - JsonMetadataServices.CreateQueueInfo, TValue>( - options: options, - collectionInfo: new() - { - ObjectCreator = () => new Queue(), - ElementInfo = jsonTypeInfo, - NumberHandling = options.NumberHandling - }); - + queueTypeInfo.EnsureConfigured(); + JsonSerializerOptions options = queueTypeInfo.Options; var bufferState = new ReadBufferState(options.DefaultBufferSize); ReadStack readStack = default; - queueTypeInfo.EnsureConfigured(); readStack.Initialize(queueTypeInfo, supportContinuation: true); var jsonReaderState = new JsonReaderState(options.GetReaderOptions()); @@ -435,7 +435,7 @@ private static async IAsyncEnumerable CreateAsyncEnumerableDeserializer< { do { - bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false); + bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken, fillBuffer: false).ConfigureAwait(false); ContinueDeserialize>( ref bufferState, ref jsonReaderState, @@ -476,7 +476,7 @@ private static async IAsyncEnumerable CreateAsyncEnumerableDeserializer< { while (true) { - bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false); + bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false); TValue value = ContinueDeserialize(ref bufferState, ref jsonReaderState, ref readStack, converter, options); if (bufferState.IsFinalBlock) @@ -507,7 +507,7 @@ private static async IAsyncEnumerable CreateAsyncEnumerableDeserializer< { while (true) { - bufferState = ReadFromStream(utf8Json, bufferState); + bufferState.ReadFromStream(utf8Json); TValue value = ContinueDeserialize(ref bufferState, ref jsonReaderState, ref readStack, converter, options); if (bufferState.IsFinalBlock) @@ -522,78 +522,6 @@ private static async IAsyncEnumerable CreateAsyncEnumerableDeserializer< } } - /// - /// Read from the stream until either our buffer is filled or we hit EOF. - /// Calling ReadCore is relatively expensive, so we minimize the number of times - /// we need to call it. - /// - internal static async ValueTask ReadFromStreamAsync( - Stream utf8Json, - ReadBufferState bufferState, - CancellationToken cancellationToken) - { - while (true) - { - int bytesRead = await utf8Json.ReadAsync( -#if BUILDING_INBOX_LIBRARY - bufferState.Buffer.AsMemory(bufferState.BytesInBuffer), -#else - bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer, -#endif - cancellationToken).ConfigureAwait(false); - - if (bytesRead == 0) - { - bufferState.IsFinalBlock = true; - break; - } - - bufferState.BytesInBuffer += bytesRead; - - if (bufferState.BytesInBuffer == bufferState.Buffer.Length) - { - break; - } - } - - return bufferState; - } - - /// - /// Read from the stream until either our buffer is filled or we hit EOF. - /// Calling ReadCore is relatively expensive, so we minimize the number of times - /// we need to call it. - /// - internal static ReadBufferState ReadFromStream( - Stream utf8Json, - ReadBufferState bufferState) - { - while (true) - { - int bytesRead = utf8Json.Read( -#if BUILDING_INBOX_LIBRARY - bufferState.Buffer.AsSpan(bufferState.BytesInBuffer)); -#else - bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer); -#endif - - if (bytesRead == 0) - { - bufferState.IsFinalBlock = true; - break; - } - - bufferState.BytesInBuffer += bytesRead; - - if (bufferState.BytesInBuffer == bufferState.Buffer.Length) - { - break; - } - } - - return bufferState; - } - internal static TValue ContinueDeserialize( ref ReadBufferState bufferState, ref JsonReaderState jsonReaderState, @@ -601,67 +529,17 @@ internal static TValue ContinueDeserialize( JsonConverter converter, JsonSerializerOptions options) { - if (bufferState.BytesInBuffer > bufferState.ClearMax) - { - bufferState.ClearMax = bufferState.BytesInBuffer; - } - - int start = 0; - if (bufferState.IsFirstIteration) - { - bufferState.IsFirstIteration = false; - - // Handle the UTF-8 BOM if present - Debug.Assert(bufferState.Buffer.Length >= JsonConstants.Utf8Bom.Length); - if (bufferState.Buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom)) - { - start += JsonConstants.Utf8Bom.Length; - bufferState.BytesInBuffer -= JsonConstants.Utf8Bom.Length; - } - } - // Process the data available TValue value = ReadCore( ref jsonReaderState, bufferState.IsFinalBlock, - new ReadOnlySpan(bufferState.Buffer, start, bufferState.BytesInBuffer), + bufferState.Bytes, options, ref readStack, converter); - Debug.Assert(readStack.BytesConsumed <= bufferState.BytesInBuffer); - int bytesConsumed = checked((int)readStack.BytesConsumed); - - bufferState.BytesInBuffer -= bytesConsumed; - - // The reader should have thrown if we have remaining bytes. - Debug.Assert(!bufferState.IsFinalBlock || bufferState.BytesInBuffer == 0); - - if (!bufferState.IsFinalBlock) - { - // Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization. - if ((uint)bufferState.BytesInBuffer > ((uint)bufferState.Buffer.Length / 2)) - { - // We have less than half the buffer available, double the buffer size. - byte[] oldBuffer = bufferState.Buffer; - int oldClearMax = bufferState.ClearMax; - byte[] newBuffer = ArrayPool.Shared.Rent((bufferState.Buffer.Length < (int.MaxValue / 2)) ? bufferState.Buffer.Length * 2 : int.MaxValue); - - // Copy the unprocessed data to the new buffer while shifting the processed bytes. - Buffer.BlockCopy(oldBuffer, bytesConsumed + start, newBuffer, 0, bufferState.BytesInBuffer); - bufferState.Buffer = newBuffer; - bufferState.ClearMax = bufferState.BytesInBuffer; - - // Clear and return the old buffer - new Span(oldBuffer, 0, oldClearMax).Clear(); - ArrayPool.Shared.Return(oldBuffer); - } - else if (bufferState.BytesInBuffer != 0) - { - // Shift the processed bytes to the beginning of buffer to make more room. - Buffer.BlockCopy(bufferState.Buffer, bytesConsumed + start, bufferState.Buffer, 0, bufferState.BytesInBuffer); - } - } + Debug.Assert(readStack.BytesConsumed <= bufferState.Bytes.Length); + bufferState.AdvanceBuffer((int)readStack.BytesConsumed); return value; } diff --git a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs index d909716eb35260..77a55f4198c56e 100644 --- a/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs +++ b/src/libraries/System.Text.Json/src/System/Text/Json/Serialization/ReadBufferState.cs @@ -2,32 +2,168 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Buffers; +using System.Diagnostics; +using System.IO; +using System.Threading; +using System.Threading.Tasks; namespace System.Text.Json.Serialization { internal struct ReadBufferState : IDisposable { - public byte[] Buffer; - public int BytesInBuffer; - public int ClearMax; - public bool IsFirstIteration; - public bool IsFinalBlock; + private byte[] _buffer; + private byte _offset; // Read bytes offset typically used when skipping the UTF-8 BOM. + private int _count; // Number of read bytes yet to be consumed by the serializer. + private int _maxCount; // Number of bytes we need to clear before returning the buffer. + private bool _isFirstBlock; + private bool _isFinalBlock; - public ReadBufferState(int defaultBufferSize) + public ReadBufferState(int initialBufferSize) { - Buffer = ArrayPool.Shared.Rent(Math.Max(defaultBufferSize, JsonConstants.Utf8Bom.Length)); - BytesInBuffer = ClearMax = 0; - IsFirstIteration = true; - IsFinalBlock = false; + _buffer = ArrayPool.Shared.Rent(Math.Max(initialBufferSize, JsonConstants.Utf8Bom.Length)); + _maxCount = _count = _offset = 0; + _isFirstBlock = true; + _isFinalBlock = false; + } + + public bool IsFinalBlock => _isFinalBlock; + + public ReadOnlySpan Bytes => _buffer.AsSpan(_offset, _count); + + /// + /// Read from the stream until either our buffer is filled or we hit EOF. + /// Calling ReadCore is relatively expensive, so we minimize the number of times + /// we need to call it. + /// + public readonly async ValueTask ReadFromStreamAsync( + Stream utf8Json, + CancellationToken cancellationToken, + bool fillBuffer = true) + { + // Since mutable structs don't work well with async state machines, + // make all updates on a copy which is returned once complete. + ReadBufferState bufferState = this; + + do + { + int bytesRead = await utf8Json.ReadAsync( +#if BUILDING_INBOX_LIBRARY + bufferState._buffer.AsMemory(bufferState._count), +#else + bufferState._buffer, bufferState._count, bufferState._buffer.Length - bufferState._count, +#endif + cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + bufferState._isFinalBlock = true; + break; + } + + bufferState._count += bytesRead; + } + while (fillBuffer && bufferState._count < bufferState._buffer.Length); + + bufferState.ProcessReadBytes(); + return bufferState; + } + + /// + /// Read from the stream until either our buffer is filled or we hit EOF. + /// Calling ReadCore is relatively expensive, so we minimize the number of times + /// we need to call it. + /// + public void ReadFromStream(Stream utf8Json) + { + do + { + int bytesRead = utf8Json.Read( +#if BUILDING_INBOX_LIBRARY + _buffer.AsSpan(_count)); +#else + _buffer, _count, _buffer.Length - _count); +#endif + + if (bytesRead == 0) + { + _isFinalBlock = true; + break; + } + + _count += bytesRead; + } + while (_count < _buffer.Length); + + ProcessReadBytes(); + } + + /// + /// Advances the buffer in anticipation of a subsequent read operation. + /// + public void AdvanceBuffer(int bytesConsumed) + { + Debug.Assert(bytesConsumed <= _count); + Debug.Assert(!_isFinalBlock || _count == bytesConsumed, "The reader should have thrown if we have remaining bytes."); + + _count -= bytesConsumed; + + if (!_isFinalBlock) + { + // Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization. + if ((uint)_count > ((uint)_buffer.Length / 2)) + { + // We have less than half the buffer available, double the buffer size. + byte[] oldBuffer = _buffer; + int oldMaxCount = _maxCount; + byte[] newBuffer = ArrayPool.Shared.Rent((_buffer.Length < (int.MaxValue / 2)) ? _buffer.Length * 2 : int.MaxValue); + + // Copy the unprocessed data to the new buffer while shifting the processed bytes. + Buffer.BlockCopy(oldBuffer, _offset + bytesConsumed, newBuffer, 0, _count); + _buffer = newBuffer; + _offset = 0; + _maxCount = _count; + + // Clear and return the old buffer + new Span(oldBuffer, 0, oldMaxCount).Clear(); + ArrayPool.Shared.Return(oldBuffer); + } + else if (_count != 0) + { + // Shift the processed bytes to the beginning of buffer to make more room. + Buffer.BlockCopy(_buffer, _offset + bytesConsumed, _buffer, 0, _count); + _offset = 0; + } + } + } + + private void ProcessReadBytes() + { + if (_count > _maxCount) + { + _maxCount = _count; + } + + if (_isFirstBlock) + { + _isFirstBlock = false; + + // Handle the UTF-8 BOM if present + Debug.Assert(_buffer.Length >= JsonConstants.Utf8Bom.Length); + if (_buffer.AsSpan(0, _count).StartsWith(JsonConstants.Utf8Bom)) + { + _offset = (byte)JsonConstants.Utf8Bom.Length; + _count -= JsonConstants.Utf8Bom.Length; + } + } } public void Dispose() { // Clear only what we used and return the buffer to the pool - new Span(Buffer, 0, ClearMax).Clear(); + new Span(_buffer, 0, _maxCount).Clear(); - byte[] toReturn = Buffer; - Buffer = null!; + byte[] toReturn = _buffer; + _buffer = null!; ArrayPool.Shared.Return(toReturn); }