Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions demo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const undici = require('.')

;(async () => {
const { body } = await undici.request('https://httpbin.org/anything')
const json = await body.json()

console.log(json)
})()
223 changes: 196 additions & 27 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

const assert = require('assert')
const net = require('net')
const http2 = require('http2')
const util = require('./core/util')
const Request = require('./core/request')
const Dispatcher = require('./dispatcher')
Expand Down Expand Up @@ -44,7 +45,7 @@ const {
kNeedDrain,
kNoRef,
kKeepAliveDefaultTimeout,
kHostHeader,
kHost,
kClosed,
kDestroyed,
kPendingIdx,
Expand All @@ -61,7 +62,8 @@ const {
kBodyTimeout,
kStrictContentLength,
kConnector,
kMaxRedirections
kMaxRedirections,
kHttp2Session
} = require('./core/symbols')

class Client extends Dispatcher {
Expand Down Expand Up @@ -173,7 +175,7 @@ class Client extends Dispatcher {
this[kOnDestroyed] = []
this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 30e3
this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 30e3
this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
Expand All @@ -191,6 +193,9 @@ class Client extends Dispatcher {
this[kQueue] = []
this[kRunningIdx] = 0
this[kPendingIdx] = 0

// HTTP/2
this[kHttp2Session] = null
}

// TODO: Make private?
Expand Down Expand Up @@ -1017,6 +1022,14 @@ function onSocketReadable (data) {
parser.readMore()
}

function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

onError(this[kClient], err)
}

function onSocketError (err) {
const { [kParser]: parser } = this

Expand Down Expand Up @@ -1070,8 +1083,10 @@ function onSocketEnd () {
function onSocketClose () {
const { [kClient]: client } = this

this[kParser].destroy(this[kError])
this[kParser] = null
if (!this[kHttp2Session]) {
this[kParser].destroy(this[kError])
this[kParser] = null
}

const err = this[kError] || new SocketError('closed')

Expand Down Expand Up @@ -1144,20 +1159,32 @@ function connect (client) {
client.emit('connectionError', client[kUrl], [client], err)
} else {
assert(socket)
if (socket.alpnProtocol === 'h2') {
const session = http2.connect(client[kUrl], {
createConnection: () => socket
})

client[kSocket] = socket
session[kError] = null
session[kClient] = client
session.on('error', onHttp2SessionError)
session.on('close', onSocketClose)

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kError] = null
socket[kParser] = new Parser(client, socket)
socket[kClient] = client
socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)
client[kHttp2Session] = session
} else {
client[kSocket] = socket

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kError] = null
socket[kParser] = new Parser(client, socket)
socket[kClient] = client
socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)
}

client.emit('connect', client[kUrl], [client])
}
Expand Down Expand Up @@ -1201,7 +1228,8 @@ function _resume (client, sync) {

const socket = client[kSocket]

if (socket) {
// TODO: @szmarczak: headers timeout
if (socket && socket.alpnProtocol !== 'h2') {
if (client[kSize] === 0) {
if (!socket[kNoRef] && socket.unref) {
socket.unref()
Expand Down Expand Up @@ -1266,12 +1294,17 @@ function _resume (client, sync) {
return
}

if (!socket) {
if (!socket && !client[kHttp2Session]) {
connect(client)
continue
}

if (socket.destroyed || socket[kWriting] || socket[kReset]) {
if (client[kHttp2Session]) {
// TODO: @szmarczak: what if exceeds max concurrent streams or can't accept new
if (client[kHttp2Session].destroyed) {
return
}
} else if (socket.destroyed || socket[kWriting] || socket[kReset]) {
return
}

Expand Down Expand Up @@ -1326,7 +1359,147 @@ function _resume (client, sync) {
}
}

function writeHttp2 (client, session, request) {
// TODO: @szmarczak: upgrade is not supported in HTTP/2
const { body, method, path, host, upgrade } = request

// https://tools.ietf.org/html/rfc7231#section-4.3.1
// https://tools.ietf.org/html/rfc7231#section-4.3.2
// https://tools.ietf.org/html/rfc7231#section-4.3.5

// Sending a payload body on a request that does not
// expect it can cause undefined behavior on some
// servers and corrupt connection state. Do not
// re-use the connection for further requests.

const expectsPayload = (
method === 'PUT' ||
method === 'POST' ||
method === 'PATCH'
)

if (body && typeof body.read === 'function') {
// Try to read EOF in order to get length.
body.read(0)
}

let contentLength = util.bodyLength(body)

if (contentLength === null) {
contentLength = request.contentLength
}

if (contentLength === 0 && !expectsPayload) {
// https://tools.ietf.org/html/rfc7230#section-3.3.2
// A user agent SHOULD NOT send a Content-Length header field when
// the request message does not contain a payload body and the method
// semantics do not anticipate such a body.

contentLength = null
}

if (request.contentLength !== null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

process.emitWarning(new RequestContentLengthMismatchError())
}

try {
request.onConnect((err) => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call onConnect immediately or on stream ready event? https://nodejs.org/api/http2.html#http2_event_ready

if (request.aborted || request.completed) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
})
} catch (err) {
errorRequest(client, request, err)
}

if (request.aborted) {
return false
}

const headers = {...request.headers}
headers[':authority'] = host || client[kHost]
headers[':path'] = path

// TODO: Expect: 100-continue

/* istanbul ignore else: assertion */
if (!body) {
if (contentLength === 0) {
headers['content-length'] = '0'
} else {
assert(contentLength === null, 'no body must not have content length')
}
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

headers['content-length'] = String(contentLength)

process.nextTick(() => {
stream.end(body)
request.onBodySent(body)
})
} else if (util.isBlob(body)) {
process.nextTick(() => {
writeBlob({ client, request, stream, contentLength, expectsPayload })
})
} else if (util.isStream(body)) {
process.nextTick(() => {
writeStream({ client, request, stream, contentLength, expectsPayload })
})
} else if (util.isIterable(body)) {
process.nextTick(() => {
writeIterable({ client, request, stream, contentLength, expectsPayload })
})
} else {
assert(false)
}

console.log('http/2 request')
const stream = session.request(headers)
stream.on('response', headers => {
if (request.onHeaders(Number(headers[':status']), headers, stream.resume.bind(stream), '') === false) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler expects an array but the native http2 module provides us with an object. What should we do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert to array

stream.pause()
}
})
stream.on('data', chunk => {
if (request.onData(chunk) === false) {
stream.pause()
}
})
stream.on('trailers', headers => {
// TODO: @szmarczak
})
stream.on('end', () => {
request.onComplete([])
})
stream.on('aborted', () => {
// TODO: @szmarczak
})
stream.on('ready', () => {
// TODO: @szmarczak
})
stream.on('timeout', () => {
// TODO: @szmarczak
})

return true
}

function write (client, request) {
if (client[kHttp2Session]) {
console.log('http/2')
writeHttp2(client, client[kHttp2Session], request)
return
}

// TODO: @szmarczak: upgrade is not supported in HTTP/2
const { body, method, path, host, upgrade, headers } = request

// https://tools.ietf.org/html/rfc7231#section-4.3.1
Expand Down Expand Up @@ -1413,11 +1586,7 @@ function write (client, request) {

let header = `${method} ${path} HTTP/1.1\r\n`

if (host) {
header += `host: ${host}\r\n`
} else {
header += client[kHostHeader]
}
header += `host: ${host || client[kHost]}\r\n`

if (upgrade) {
header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
Expand All @@ -1427,8 +1596,8 @@ function write (client, request) {
header += 'connection: close\r\n'
}

if (headers) {
header += headers
for (const key in headers) {
header += `${key}: ${headers[key]}\r\n`
}

/* istanbul ignore else: assertion */
Expand Down
2 changes: 2 additions & 0 deletions lib/core/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) {
const session = sessionCache.get(servername) || null

socket = tls.connect({
// TODO: @szmarczak: move this to client.js
ALPNProtocols: ['h2', 'http/1.1'],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

experimental + disabled by default?

...options,
servername,
session,
Expand Down
4 changes: 2 additions & 2 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Request {

this.contentType = null

this.headers = ''
this.headers = {}

if (Array.isArray(headers)) {
if (headers.length % 2 !== 0) {
Expand Down Expand Up @@ -226,7 +226,7 @@ function processHeader (request, key, val) {
) {
throw new NotSupportedError('expect header not supported')
} else {
request.headers += `${key}: ${val}\r\n`
request.headers[key] = val
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make http1 slower... :/

}
}

Expand Down
5 changes: 3 additions & 2 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ module.exports = {
kOnDestroyed: Symbol('destroy callbacks'),
kPipelining: Symbol('pipelinig'),
kSocket: Symbol('socket'),
kHostHeader: Symbol('host header'),
kHost: Symbol('host'),
kConnector: Symbol('connector'),
kStrictContentLength: Symbol('strict content length'),
kMaxRedirections: Symbol('maxRedirections')
kMaxRedirections: Symbol('maxRedirections'),
kHttp2Session: Symbol('http2Session')
}