@@ -21,19 +21,19 @@ extension AsyncSequence where Element == ByteBuffer {
2121/// Use the ``AsyncSequence/decode(using:)`` function to create a ``NIODecodedAsyncSequence``.
2222@available ( macOS 10 . 15 , iOS 13 . 0 , watchOS 6 . 0 , tvOS 13 . 0 , * )
2323public struct NIODecodedAsyncSequence <
24- AsyncSeq : AsyncSequence ,
24+ Base : AsyncSequence ,
2525 Decoder: NIOSingleStepByteToMessageDecoder ,
2626 Element
27- > where AsyncSeq . Element == ByteBuffer , Decoder. InboundOut == Element {
27+ > where Base . Element == ByteBuffer , Decoder. InboundOut == Element {
2828 @usableFromInline
29- var asyncSequence : AsyncSeq
29+ var asyncSequence : Base
3030 @usableFromInline
3131 var decoder : Decoder
3232 @usableFromInline
3333 var maximumBufferSize : Int ?
3434
3535 @inlinable
36- init ( asyncSequence: AsyncSeq , decoder: Decoder , maximumBufferSize: Int ? = nil ) {
36+ init ( asyncSequence: Base , decoder: Decoder , maximumBufferSize: Int ? = nil ) {
3737 self . asyncSequence = asyncSequence
3838 self . decoder = decoder
3939 self . maximumBufferSize = maximumBufferSize
@@ -58,7 +58,7 @@ extension NIODecodedAsyncSequence: AsyncSequence {
5858 }
5959
6060 @usableFromInline
61- var baseIterator : AsyncSeq . AsyncIterator
61+ var baseIterator : Base . AsyncIterator
6262 @usableFromInline
6363 var processor : NIOSingleStepByteToMessageProcessor < Decoder >
6464 @usableFromInline
@@ -74,12 +74,15 @@ extension NIODecodedAsyncSequence: AsyncSequence {
7474 self . state = . readingFromBuffer
7575 }
7676
77- /// Retrieve the next element from the ``DecodeSequence``.
7877 @inlinable
79- public mutating func next( ) async throws -> Element ? {
80- if self . state == . finishedDecoding { return nil }
81-
82- let readLastChunkFromBuffer = self . state == . readLastChunkFromBuffer
78+ mutating func decodeFromBuffer( ) throws -> Element ? {
79+ let readLastChunkFromBuffer =
80+ switch self . state {
81+ case . readLastChunkFromBuffer:
82+ true
83+ case . finishedDecoding, . readingFromBuffer:
84+ false
85+ }
8386
8487 // Decode from the buffer if possible
8588 let ( decoded, ended) = try self . processor. decodeNext (
@@ -95,32 +98,52 @@ extension NIODecodedAsyncSequence: AsyncSequence {
9598 return decoded
9699 }
97100
98- // If we've managed to decode a message, return it
99- if let decoded {
100- return decoded
101- }
102-
103101 // If `ended == false` and if `readLastChunkFromBuffer == true` then we must have manged
104102 // to decode a message. Otherwise something is wrong in `decodeNext()`.
105- // We precondition here to avoid infinite recursion.
106- precondition ( !readLastChunkFromBuffer)
107-
108- // Read more data into the buffer so we can decode more messages
109- guard let nextBuffer = try await self . baseIterator. next ( ) else {
110- // Ran out of data to read.
111- self . state = . readLastChunkFromBuffer
112- return try await self . next ( )
103+ assert ( !readLastChunkFromBuffer || decoded != nil )
104+
105+ return decoded
106+ }
107+
108+ /// Retrieve the next element from the ``NIODecodedAsyncSequence``.
109+ @inlinable
110+ public mutating func next( ) async throws -> Element ? {
111+ switch self . state {
112+ case . finishedDecoding:
113+ return nil
114+ case . readingFromBuffer, . readLastChunkFromBuffer:
115+ break
113116 }
114117
115- self . processor. append ( nextBuffer)
118+ if let decoded = try self . decodeFromBuffer ( ) {
119+ return decoded
120+ }
116121
117- return try await self . next ( )
122+ while self . state == . readingFromBuffer {
123+ // Read more data into the buffer so we can decode more messages
124+ guard let nextBuffer = try await self . baseIterator. next ( ) else {
125+ // Ran out of data to read.
126+ self . state = . readLastChunkFromBuffer
127+ let decoded = try self . decodeFromBuffer ( )
128+ // `decodeFromBuffer()` must have set the state to `.finishedDecoding` if it returned `nil`.
129+ assert ( decoded != nil || self . state == . finishedDecoding)
130+ return decoded
131+ }
132+
133+ self . processor. append ( nextBuffer)
134+
135+ if let decoded = try self . decodeFromBuffer ( ) {
136+ return decoded
137+ }
138+ }
139+
140+ return nil
118141 }
119142 }
120143}
121144
122145@available ( macOS 10 . 15 , iOS 13 . 0 , watchOS 6 . 0 , tvOS 13 . 0 , * )
123- extension NIODecodedAsyncSequence : Sendable where AsyncSeq : Sendable , Decoder: Sendable { }
146+ extension NIODecodedAsyncSequence : Sendable where Base : Sendable , Decoder: Sendable { }
124147
125148@available ( * , unavailable)
126149extension NIODecodedAsyncSequence . AsyncIterator : Sendable { }
0 commit comments