Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions Sources/NIOHTTP1Server/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore
import NIOHTTP1
import NIOPosix
Expand Down Expand Up @@ -138,7 +139,7 @@ private final class HTTPHandler: ChannelInboundHandler {
),
promise: nil
)
context.write(self.wrapOutboundOut(.body(.byteBuffer(self.buffer))), promise: nil)
context.write(Self.wrapOutboundOut(.body(.byteBuffer(self.buffer))), promise: nil)
self.completeResponse(context, trailers: nil, promise: nil)
}
}
Expand Down Expand Up @@ -178,7 +179,7 @@ private final class HTTPHandler: ChannelInboundHandler {
),
promise: nil
)
context.write(self.wrapOutboundOut(.body(.byteBuffer(self.buffer))), promise: nil)
context.write(Self.wrapOutboundOut(.body(.byteBuffer(self.buffer))), promise: nil)
self.completeResponse(context, trailers: nil, promise: nil)
} else {
self.completeResponse(context, trailers: nil, promise: nil)
Expand Down Expand Up @@ -206,7 +207,11 @@ private final class HTTPHandler: ChannelInboundHandler {
()
case .end:
self.state.requestComplete()
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)
let loopBoundSelf = NIOLoopBound(self, eventLoop: context.eventLoop)
context.eventLoop.scheduleTask(in: delay) { () -> Void in
let `self` = loopBoundSelf.value
let context = loopBoundContext.value
var buf = context.channel.allocator.buffer(capacity: string.utf8.count)
buf.writeString(string)
context.writeAndFlush(Self.wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil)
Expand All @@ -227,14 +232,19 @@ private final class HTTPHandler: ChannelInboundHandler {
self.keepAlive = request.isKeepAlive
self.continuousCount = 0
self.state.requestReceived()
func doNext() {
let eventLoop = context.eventLoop
let loopBoundContext = NIOLoopBound(context, eventLoop: eventLoop)
let loopBoundSelf = NIOLoopBound(self, eventLoop: eventLoop)
@Sendable func doNext() {
let `self` = loopBoundSelf.value
let context = loopBoundContext.value
self.buffer.clear()
self.continuousCount += 1
self.buffer.writeString("line \(self.continuousCount)\n")
context.writeAndFlush(Self.wrapOutboundOut(.body(.byteBuffer(self.buffer)))).map {
context.eventLoop.scheduleTask(in: .milliseconds(400), doNext)
eventLoop.scheduleTask(in: .milliseconds(400), doNext)
}.whenFailure { (_: Error) in
self.completeResponse(context, trailers: nil, promise: nil)
loopBoundSelf.value.completeResponse(loopBoundContext.value, trailers: nil, promise: nil)
}
}
context.writeAndFlush(
Expand All @@ -260,13 +270,20 @@ private final class HTTPHandler: ChannelInboundHandler {
self.keepAlive = request.isKeepAlive
self.continuousCount = 0
self.state.requestReceived()
func doNext() {
let eventLoop = context.eventLoop
let loopBoundContext = NIOLoopBound(context, eventLoop: eventLoop)
let loopBoundSelf = NIOLoopBound(self, eventLoop: eventLoop)
@Sendable func doNext() {
let `self` = loopBoundSelf.value
let context = loopBoundContext.value
self.buffer.clear()
self.buffer.writeString(strings[self.continuousCount])
self.continuousCount += 1
context.writeAndFlush(Self.wrapOutboundOut(.body(.byteBuffer(self.buffer)))).whenSuccess {
let `self` = loopBoundSelf.value
let context = loopBoundContext.value
if self.continuousCount < strings.count {
context.eventLoop.scheduleTask(in: delay, doNext)
eventLoop.scheduleTask(in: delay, doNext)
} else {
self.completeResponse(context, trailers: nil, promise: nil)
}
Expand Down Expand Up @@ -356,9 +373,14 @@ private final class HTTPHandler: ChannelInboundHandler {
path: String
) {
self.buffer.clear()

func sendErrorResponse(request: HTTPRequestHead, _ error: Error) {
var body = context.channel.allocator.buffer(capacity: 128)
let eventLoop = context.eventLoop
let allocator = context.channel.allocator
let loopBoundContext = NIOLoopBound(context, eventLoop: eventLoop)
let loopBoundSelf = NIOLoopBound(self, eventLoop: eventLoop)

@Sendable func sendErrorResponse(request: HTTPRequestHead, _ error: Error) {
let context = loopBoundContext.value
var body = allocator.buffer(capacity: 128)
let response = { () -> HTTPResponseHead in
switch error {
case let e as IOError where e.errnoCode == ENOENT:
Expand All @@ -382,7 +404,7 @@ private final class HTTPHandler: ChannelInboundHandler {
context.channel.close(promise: nil)
}

func responseHead(request: HTTPRequestHead, fileRegion region: FileRegion) -> HTTPResponseHead {
@Sendable func responseHead(request: HTTPRequestHead, fileRegion region: FileRegion) -> HTTPResponseHead {
var response = httpResponseHead(request: request, status: .ok)
response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
Expand All @@ -404,32 +426,39 @@ private final class HTTPHandler: ChannelInboundHandler {
fileHandleAndRegion.whenFailure {
sendErrorResponse(request: request, $0)
}
fileHandleAndRegion.whenSuccess { (file, region) in
fileHandleAndRegion.whenSuccess { [fileIO = self.fileIO] (file, region) in
let context = loopBoundContext.value
let loopBoundFile = NIOLoopBound(file, eventLoop: eventLoop)
switch ioMethod {
case .nonblockingFileIO:
var responseStarted = false
let responseStarted = NIOLoopBoundBox(false, eventLoop: eventLoop)
let response = responseHead(request: request, fileRegion: region)
if region.readableBytes == 0 {
responseStarted = true
responseStarted.value = true
context.write(Self.wrapOutboundOut(.head(response)), promise: nil)
}
return self.fileIO.readChunked(
return fileIO.readChunked(
fileRegion: region,
chunkSize: 32 * 1024,
allocator: context.channel.allocator,
eventLoop: context.eventLoop
) { buffer in
if !responseStarted {
responseStarted = true
let context = loopBoundContext.value
if !responseStarted.value {
responseStarted.value = true
context.write(Self.wrapOutboundOut(.head(response)), promise: nil)
}
return context.writeAndFlush(Self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}.flatMap { () -> EventLoopFuture<Void> in
let context = loopBoundContext.value
let `self` = loopBoundSelf.value
let p = context.eventLoop.makePromise(of: Void.self)
self.completeResponse(context, trailers: nil, promise: p)
return p.futureResult
}.flatMapError { error in
if !responseStarted {
let context = loopBoundContext.value
let `self` = loopBoundSelf.value
if !responseStarted.value {
let response = httpResponseHead(request: request, status: .ok)
context.write(Self.wrapOutboundOut(.head(response)), promise: nil)
var buffer = context.channel.allocator.buffer(capacity: 100)
Expand All @@ -441,19 +470,21 @@ private final class HTTPHandler: ChannelInboundHandler {
return context.close()
}
}.whenComplete { (_: Result<Void, Error>) in
_ = try? file.close()
_ = try? loopBoundFile.value.close()
}
case .sendfile:
let context = loopBoundContext.value
let response = responseHead(request: request, fileRegion: region)
context.write(Self.wrapOutboundOut(.head(response)), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(.body(.fileRegion(region)))).flatMap {
let context = loopBoundContext.value
let p = context.eventLoop.makePromise(of: Void.self)
self.completeResponse(context, trailers: nil, promise: p)
loopBoundSelf.value.completeResponse(context, trailers: nil, promise: p)
return p.futureResult
}.flatMapError { (_: Error) in
context.close()
loopBoundContext.value.close()
}.whenComplete { (_: Result<Void, Error>) in
_ = try? file.close()
_ = try? loopBoundFile.value.close()
}
}
}
Expand All @@ -470,10 +501,14 @@ private final class HTTPHandler: ChannelInboundHandler {
promise: EventLoopPromise<Void>?
) {
self.state.responseComplete()
let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)

let promise = self.keepAlive ? promise : (promise ?? context.eventLoop.makePromise())
if !self.keepAlive {
promise!.futureResult.whenComplete { (_: Result<Void, Error>) in context.close(promise: nil) }
promise!.futureResult.whenComplete { (_: Result<Void, Error>) in
let context = loopBoundContext.value
context.close(promise: nil)
}
}
self.handler = nil

Expand Down
Loading