Skip to content

Commit 0d8cbc2

Browse files
committed
Add tests with + fix bugs the tests uncovered
1 parent ad94341 commit 0d8cbc2

File tree

2 files changed

+181
-25
lines changed

2 files changed

+181
-25
lines changed

Sources/NIOCore/NIODecodedAsyncSequence.swift

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -118,28 +118,9 @@ extension NIODecodedAsyncSequence: AsyncSequence {
118118
return decoded
119119
}
120120

121-
// If `ended == false` and if `readLastChunkFromBuffer == true` then we must have manged
122-
// to decode a message. Otherwise something is wrong in `decodeNext()`.
123-
assert(!readLastChunkFromBuffer || decoded != nil)
124-
125121
return decoded
126122
}
127123

128-
@inlinable
129-
func assertFinishedDecodingIfNoDecoded(_ decoded: Element?) {
130-
guard decoded == nil else {
131-
return
132-
}
133-
switch self.state {
134-
case .finishedDecoding:
135-
break
136-
case .readingFromBuffer, .readLastChunkFromBuffer:
137-
assertionFailure(
138-
"'decodeFromBuffer()' must have set the 'state' to '.finishedDecoding' if it returned 'nil'."
139-
)
140-
}
141-
}
142-
143124
/// Retrieve the next element from the ``NIODecodedAsyncSequence``.
144125
///
145126
/// The same as `next(isolation:)` but not isolated to an actor, which allows
@@ -170,9 +151,12 @@ extension NIODecodedAsyncSequence: AsyncSequence {
170151
guard let nextBuffer = try await self.baseIterator.next() else {
171152
// Ran out of data to read.
172153
self.state = .readLastChunkFromBuffer
173-
let decoded = try self.decodeFromBuffer()
174-
self.assertFinishedDecodingIfNoDecoded(decoded)
175-
return decoded
154+
if let decoded = try self.decodeFromBuffer() {
155+
return decoded
156+
} else {
157+
self.state = .finishedDecoding
158+
return nil
159+
}
176160
}
177161

178162
self.processor.append(nextBuffer)
@@ -215,9 +199,12 @@ extension NIODecodedAsyncSequence: AsyncSequence {
215199
guard let nextBuffer = try await self.baseIterator.next(isolation: actor) else {
216200
// Ran out of data to read.
217201
self.state = .readLastChunkFromBuffer
218-
let decoded = try self.decodeFromBuffer()
219-
self.assertFinishedDecodingIfNoDecoded(decoded)
220-
return decoded
202+
if let decoded = try self.decodeFromBuffer() {
203+
return decoded
204+
} else {
205+
self.state = .finishedDecoding
206+
return nil
207+
}
221208
}
222209

223210
self.processor.append(nextBuffer)
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
import Testing
17+
18+
@Suite
19+
struct NIODecodedAsyncSequenceTests {
20+
private final class ByteToInt32Decoder: NIOSingleStepByteToMessageDecoder {
21+
typealias InboundOut = Int32
22+
23+
func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
24+
buffer.readInteger()
25+
}
26+
27+
func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
28+
#expect(seenEOF)
29+
return try self.decode(buffer: &buffer)
30+
}
31+
}
32+
33+
private final class ThrowingDecoder: NIOSingleStepByteToMessageDecoder {
34+
typealias InboundOut = Int32
35+
36+
struct DecoderError: Error {}
37+
38+
func decode(buffer: inout ByteBuffer) throws -> InboundOut? {
39+
throw DecoderError()
40+
}
41+
42+
func decodeLast(buffer: inout ByteBuffer, seenEOF: Bool) throws -> InboundOut? {
43+
#expect(seenEOF)
44+
return try self.decode(buffer: &buffer)
45+
}
46+
}
47+
48+
static let testingArguments = [
49+
(elementCount: 100, chunkSize: 4),
50+
(elementCount: 89, chunkSize: 4),
51+
(elementCount: 77, chunkSize: 1),
52+
(elementCount: 65, chunkSize: 3),
53+
(elementCount: 61, chunkSize: 100),
54+
(elementCount: 55, chunkSize: 15),
55+
]
56+
57+
@Test(arguments: Self.testingArguments)
58+
func decodingWorks(elementCount: Int, chunkSize: Int) async throws {
59+
let baseSequence = AsyncStream<ByteBuffer>.makeStream()
60+
61+
let randomElements: [UInt8] = (0..<elementCount).map {
62+
_ in UInt8.random(in: .min ... .max)
63+
}
64+
65+
let buffers =
66+
randomElements
67+
.chunks(ofSize: chunkSize)
68+
.map(ByteBuffer.init(bytes:))
69+
70+
try await withThrowingTaskGroup(of: Void.self) { group in
71+
group.addTask {
72+
for buffer in buffers {
73+
/// Sleep for 10ms to simulate asynchronous work
74+
try await Task.sleep(nanoseconds: 10_000_000)
75+
baseSequence.continuation.yield(buffer)
76+
}
77+
baseSequence.continuation.finish()
78+
}
79+
80+
group.addTask {
81+
var randomElements = randomElements
82+
let decodedSequence = baseSequence.stream.decode(using: ByteToInt32Decoder())
83+
84+
for try await element in decodedSequence {
85+
// Create an Int32 from the first 4 UInt8s
86+
let int32 = randomElements[0..<4].enumerated().reduce(into: Int32(0)) { result, next in
87+
result |= Int32(next.element) << ((3 - next.offset) * 8)
88+
}
89+
randomElements = Array(randomElements[4...])
90+
#expect(element == int32)
91+
}
92+
}
93+
94+
try await group.waitForAll()
95+
}
96+
}
97+
98+
@Test(arguments: Self.testingArguments)
99+
func throwsWhenDecoderThrows(elementCount: Int, chunkSize: Int) async throws {
100+
let baseSequence = AsyncStream<ByteBuffer>.makeStream()
101+
102+
let randomElements: [UInt8] = (0..<elementCount).map {
103+
_ in UInt8.random(in: .min ... .max)
104+
}
105+
106+
let buffers =
107+
randomElements
108+
.chunks(ofSize: chunkSize)
109+
.map(ByteBuffer.init(bytes:))
110+
111+
await withThrowingTaskGroup(of: Void.self) { group in
112+
group.addTask {
113+
for buffer in buffers {
114+
/// Sleep for 10ms to simulate asynchronous work
115+
try await Task.sleep(nanoseconds: 10_000_000)
116+
baseSequence.continuation.yield(buffer)
117+
}
118+
baseSequence.continuation.finish()
119+
}
120+
121+
group.addTask {
122+
let decodedSequence = baseSequence.stream.decode(using: ThrowingDecoder())
123+
124+
for try await _ in decodedSequence {
125+
Issue.record("Should not have reached here")
126+
}
127+
}
128+
129+
await #expect(throws: ThrowingDecoder.DecoderError.self) {
130+
try await group.waitForAll()
131+
}
132+
}
133+
}
134+
135+
@Test(arguments: Self.testingArguments)
136+
func throwsWhenStreamThrows(elementCount: Int, chunkSize: Int) async throws {
137+
struct StreamError: Error {}
138+
139+
let baseSequence = AsyncThrowingStream<ByteBuffer, any Error>.makeStream()
140+
141+
await withThrowingTaskGroup(of: Void.self) { group in
142+
group.addTask {
143+
/// Sleep for 50ms to simulate asynchronous work
144+
try await Task.sleep(nanoseconds: 50_000_000)
145+
baseSequence.continuation.finish(throwing: StreamError())
146+
}
147+
148+
group.addTask {
149+
let decodedSequence = baseSequence.stream.decode(using: ByteToInt32Decoder())
150+
151+
for try await _ in decodedSequence {
152+
Issue.record("Should not have reached here")
153+
}
154+
}
155+
156+
await #expect(throws: StreamError.self) {
157+
try await group.waitForAll()
158+
}
159+
}
160+
}
161+
}
162+
163+
extension Array {
164+
fileprivate func chunks(ofSize size: Int) -> [[Element]] {
165+
stride(from: 0, to: count, by: size).map {
166+
Array(self[$0..<Swift.min($0 + size, count)])
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)