Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions doc/api/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ added: v0.1.94
-->

* `response` {http.IncomingMessage}
* `socket` {stream.Duplex}
* `stream` {stream.Duplex}
* `head` {Buffer}

Emitted each time a server responds to a request with an upgrade. If this
Expand All @@ -768,13 +768,13 @@ const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('okay');
});
server.on('upgrade', (req, socket, head) => {
socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
server.on('upgrade', (req, stream, head) => {
stream.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n');

socket.pipe(socket); // echo back
stream.pipe(stream); // echo back
});

// Now that server is running
Expand All @@ -793,9 +793,9 @@ server.listen(1337, '127.0.0.1', () => {
const req = http.request(options);
req.end();

req.on('upgrade', (res, socket, upgradeHead) => {
req.on('upgrade', (res, stream, upgradeHead) => {
console.log('got upgraded!');
socket.end();
stream.end();
process.exit(0);
});
});
Expand All @@ -809,13 +809,13 @@ const server = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('okay');
});
server.on('upgrade', (req, socket, head) => {
socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
server.on('upgrade', (req, stream, head) => {
stream.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n');

socket.pipe(socket); // echo back
stream.pipe(stream); // echo back
});

// Now that server is running
Expand All @@ -834,9 +834,9 @@ server.listen(1337, '127.0.0.1', () => {
const req = http.request(options);
req.end();

req.on('upgrade', (res, socket, upgradeHead) => {
req.on('upgrade', (res, stream, upgradeHead) => {
console.log('got upgraded!');
socket.end();
stream.end();
process.exit(0);
});
});
Expand Down Expand Up @@ -1674,6 +1674,14 @@ per connection (in the case of HTTP Keep-Alive connections).
<!-- YAML
added: v0.1.94
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/60016
description: Request bodies are no longer exposed raw (unparsed) on the
socket argument. Instead, if a body is received, the stream
argument will be a duplex that emits socket content only
after the request body, while the parsed request body data
will be emitted from the request, just as in normal server
`'request'` events.
- version:
- v24.9.0
- v22.21.0
Expand All @@ -1689,31 +1697,37 @@ changes:

* `request` {http.IncomingMessage} Arguments for the HTTP request, as it is in
the [`'request'`][] event
* `socket` {stream.Duplex} Network socket between the server and client
* `stream` {stream.Duplex} The upgraded stream between the server and client
* `head` {Buffer} The first packet of the upgraded stream (may be empty)

Emitted each time a client's HTTP upgrade request is accepted. By default
all HTTP upgrade requests are ignored (i.e. only regular `'request'` events
are emitted, sticking with the normal HTTP request/response flow) unless you
listen to this event, in which case they are all accepted (i.e. the `'upgrade'`
event is emitted instead, and future communication must handled directly
through the raw socket). You can control this more precisely by using the
through the raw stream). You can control this more precisely by using the
server `shouldUpgradeCallback` option.

Listening to this event is optional and clients cannot insist on a protocol
change.

After this event is emitted, the request's socket will not have a `'data'`
event listener, meaning it will need to be bound in order to handle data
sent to the server on that socket.

If an upgrade is accepted by `shouldUpgradeCallback` but no event handler
is registered then the socket is destroyed, resulting in an immediate
is registered then the socket will be destroyed, resulting in an immediate
connection closure for the client.

This event is guaranteed to be passed an instance of the {net.Socket} class,
a subclass of {stream.Duplex}, unless the user specifies a socket
type other than {net.Socket}.
In the uncommon case that the incoming request has a body, this body will be
parsed as normal, separate to the upgrade stream, and the raw stream data will
only begin after it has completed. To ensure that reading from the stream isn't
blocked by waiting for the request body to be read, any reads on the stream
will start the request body flowing automatically. If you want to read the
request body, ensure that you do so (i.e. you attach `'data'` listeners)
before starting to read from the upgraded stream.

The stream argument will typically be the {net.Socket} instance used by the
request, but in some cases (such as with a request body) it may be a duplex
stream. If required, you can access the raw connection underlying the request
via [`request.socket`][], which is guaranteed to be an instance of {net.Socket}
unless the user specified another socket type.

### `server.close([callback])`

Expand Down
137 changes: 120 additions & 17 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const {
SymbolFor,
} = primordials;

const { Duplex } = require('stream');
const net = require('net');
const EE = require('events');
const assert = require('internal/assert');
Expand All @@ -43,6 +44,7 @@ const {
continueExpression,
chunkExpression,
kIncomingMessage,
kSocket,
HTTPParser,
isLenient,
_checkInvalidHeaderChar: checkInvalidHeaderChar,
Expand Down Expand Up @@ -106,6 +108,7 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');

const kServerResponse = Symbol('ServerResponse');
const kServerResponseStatistics = Symbol('ServerResponseStatistics');
const kUpgradeStream = Symbol('UpgradeStream');

const kOptimizeEmptyRequests = Symbol('OptimizeEmptyRequestsOption');

Expand Down Expand Up @@ -953,6 +956,77 @@ function socketOnError(e) {
}
}

class UpgradeStream extends Duplex {
constructor(socket, req) {
super({
allowHalfOpen: socket.allowHalfOpen,
});

this[kSocket] = socket;
this[kIncomingMessage] = req;

// Proxy error, end & closure events immediately.
socket.on('error', (err) => this.destroy(err));

socket.on('close', () => this.destroy());
this.on('close', () => socket.destroy());

socket.on('end', () => {
this.push(null);

// Match the socket behaviour, where 'end' will fire despite no 'data'
// listeners if a socket with no pending data ends:
if (this.readableLength === 0) {
this.resume();
}
});

// Other events (most notably, reading) all only
// activate after requestBodyCompleted is called.
}

requestBodyCompleted(upgradeHead) {
this[kIncomingMessage] = null;

// When the request body is completed, we begin streaming all the
// post-body data for the upgraded protocol:
if (upgradeHead?.length > 0) {
if (!this.push(upgradeHead)) {
this[kSocket].pause();
}
}

this[kSocket].on('data', (data) => {
if (!this.push(data)) {
this[kSocket].pause();
}
});
}

_read(size) {
// Reading the upgrade stream starts the request stream flowing. It's
// important that this happens, even if there are no listeners, or it
// would be impossible to read this without explicitly reading all the
// request body first, which is backward incompatible & awkward.
this[kIncomingMessage]?.resume();

this[kSocket].resume();
}

_final(callback) {
this[kSocket].end(callback);
}

_write(chunk, encoding, callback) {
this[kSocket].write(chunk, encoding, callback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly the callback is always called asynchronously even if the write is synchronous so there will be a small overhead over a raw net.Socket in case of multiple consecutive sync writes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is definitely a small performance hit here. I doubt it will be significant for most cases, but it's not zero.

I'm open to ideas to improve that perf, but I think it's worth it for the correctness in this case, and the intention is that it only applies in this upgrade-with-body case, so it doesn't affect any non-body upgrade requests at all.

}

_destroy(err, callback) {
this[kSocket].destroy(err);
callback(err);
}
}

function onParserExecuteCommon(server, socket, parser, state, ret, d) {
if (ret instanceof Error) {
prepareError(ret, parser, d);
Expand All @@ -962,28 +1036,56 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
// Upgrade or CONNECT
const req = parser.incoming;
debug('SERVER upgrade or connect', req.method);
const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';

let upgradeStream;
if (req.complete) {
d ||= parser.getCurrentBuffer();

socket.removeListener('data', state.onData);
socket.removeListener('end', state.onEnd);
socket.removeListener('close', state.onClose);
socket.removeListener('drain', state.onDrain);
socket.removeListener('error', socketOnError);
socket.removeListener('timeout', socketOnTimeout);

unconsume(parser, socket);
parser.finish();
freeParser(parser, req, socket);
parser = null;

// If the request is complete (no body, or all body read upfront) then
// we just emit the socket directly as the upgrade stream.
upgradeStream = socket;
} else {
// If the body hasn't been fully parsed yet, we emit immediately but
// we add a wrapper around the socket to not expose incoming data
// until the request body has finished.

if (socket[kUpgradeStream]) {
// We've already emitted the incomplete upgrade - nothing do to
// until actual body parsing completion.
return;
}

d ||= parser.getCurrentBuffer();
d ||= Buffer.alloc(0);

socket.removeListener('data', state.onData);
socket.removeListener('end', state.onEnd);
socket.removeListener('close', state.onClose);
socket.removeListener('drain', state.onDrain);
socket.removeListener('error', socketOnError);
socket.removeListener('timeout', socketOnTimeout);
unconsume(parser, socket);
parser.finish();
freeParser(parser, req, socket);
parser = null;
upgradeStream = new UpgradeStream(socket, req);
socket[kUpgradeStream] = upgradeStream;
}

const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
if (server.listenerCount(eventName) > 0) {
debug('SERVER have listener for %s', eventName);
const bodyHead = d.slice(ret, d.length);

socket.readableFlowing = null;
const bodyHead = d.slice(ret, d.length);

server.emit(eventName, req, socket, bodyHead);
if (req.complete && socket[kUpgradeStream]) {
// Previously emitted, now completed - just activate the stream
socket[kUpgradeStream].requestBodyCompleted(bodyHead);
} else {
socket.readableFlowing = null;
server.emit(eventName, req, upgradeStream, bodyHead);
}
} else {
// Got upgrade or CONNECT method, but have no handler.
socket.destroy();
Expand Down Expand Up @@ -1089,8 +1191,9 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
if (req.upgrade) {
req.upgrade = req.method === 'CONNECT' ||
!!server.shouldUpgradeCallback(req);
if (req.upgrade)
return 2;
if (req.upgrade) {
return 0;
}
}

state.incoming.push(req);
Expand Down
Loading
Loading