diff --git a/demo.js b/demo.js new file mode 100644 index 00000000000..d4f35697d27 --- /dev/null +++ b/demo.js @@ -0,0 +1,8 @@ +const undici = require('.') + +;(async () => { + const { body } = await undici.request('https://httpbin.org/anything') + const json = await body.json() + + console.log(json) +})() diff --git a/lib/client.js b/lib/client.js index 4dd5b99568b..24ca5afac64 100644 --- a/lib/client.js +++ b/lib/client.js @@ -4,6 +4,7 @@ const assert = require('assert') const net = require('net') +const http2 = require('http2') const util = require('./core/util') const Request = require('./core/request') const Dispatcher = require('./dispatcher') @@ -44,7 +45,7 @@ const { kNeedDrain, kNoRef, kKeepAliveDefaultTimeout, - kHostHeader, + kHost, kClosed, kDestroyed, kPendingIdx, @@ -61,7 +62,8 @@ const { kBodyTimeout, kStrictContentLength, kConnector, - kMaxRedirections + kMaxRedirections, + kHttp2Session } = require('./core/symbols') class Client extends Dispatcher { @@ -173,7 +175,7 @@ class Client extends Dispatcher { this[kOnDestroyed] = [] this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming - this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n` + this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 30e3 this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 30e3 this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength @@ -191,6 +193,9 @@ class Client extends Dispatcher { this[kQueue] = [] this[kRunningIdx] = 0 this[kPendingIdx] = 0 + + // HTTP/2 + this[kHttp2Session] = null } // TODO: Make private? @@ -1017,6 +1022,14 @@ function onSocketReadable (data) { parser.readMore() } +function onHttp2SessionError (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) +} + function onSocketError (err) { const { [kParser]: parser } = this @@ -1070,8 +1083,10 @@ function onSocketEnd () { function onSocketClose () { const { [kClient]: client } = this - this[kParser].destroy(this[kError]) - this[kParser] = null + if (!this[kHttp2Session]) { + this[kParser].destroy(this[kError]) + this[kParser] = null + } const err = this[kError] || new SocketError('closed') @@ -1144,20 +1159,33 @@ function connect (client) { client.emit('connectionError', client[kUrl], [client], err) } else { assert(socket) + if (socket.alpnProtocol === 'h2') { + const session = http2.connect(client[kUrl], { + createConnection: () => socket + }) - client[kSocket] = socket + session[kError] = null + session[kClient] = client + session.on('error', onHttp2SessionError) + session.on('close', onSocketClose) + session.unref() - socket[kNoRef] = false - socket[kWriting] = false - socket[kReset] = false - socket[kError] = null - socket[kParser] = new Parser(client, socket) - socket[kClient] = client - socket - .on('error', onSocketError) - .on('readable', onSocketReadable) - .on('end', onSocketEnd) - .on('close', onSocketClose) + client[kHttp2Session] = session + } else { + client[kSocket] = socket + + socket[kNoRef] = false + socket[kWriting] = false + socket[kReset] = false + socket[kError] = null + socket[kParser] = new Parser(client, socket) + socket[kClient] = client + socket + .on('error', onSocketError) + .on('readable', onSocketReadable) + .on('end', onSocketEnd) + .on('close', onSocketClose) + } client.emit('connect', client[kUrl], [client]) } @@ -1201,7 +1229,8 @@ function _resume (client, sync) { const socket = client[kSocket] - if (socket) { + // TODO: @szmarczak: headers timeout + if (socket && socket.alpnProtocol !== 'h2') { if (client[kSize] === 0) { if (!socket[kNoRef] && socket.unref) { socket.unref() @@ -1266,12 +1295,17 @@ function _resume (client, sync) { return } - if (!socket) { + if (!socket && !client[kHttp2Session]) { connect(client) continue } - if (socket.destroyed || socket[kWriting] || socket[kReset]) { + if (client[kHttp2Session]) { + // TODO: @szmarczak: what if exceeds max concurrent streams or can't accept new + if (client[kHttp2Session].destroyed) { + return + } + } else if (socket.destroyed || socket[kWriting] || socket[kReset]) { return } @@ -1326,7 +1360,153 @@ function _resume (client, sync) { } } +function writeHttp2 (client, session, request) { + // TODO: @szmarczak: upgrade is not supported in HTTP/2 + const { body, method, path, host, upgrade } = request + + // https://tools.ietf.org/html/rfc7231#section-4.3.1 + // https://tools.ietf.org/html/rfc7231#section-4.3.2 + // https://tools.ietf.org/html/rfc7231#section-4.3.5 + + // Sending a payload body on a request that does not + // expect it can cause undefined behavior on some + // servers and corrupt connection state. Do not + // re-use the connection for further requests. + + const expectsPayload = ( + method === 'PUT' || + method === 'POST' || + method === 'PATCH' + ) + + if (body && typeof body.read === 'function') { + // Try to read EOF in order to get length. + body.read(0) + } + + let contentLength = util.bodyLength(body) + + if (contentLength === null) { + contentLength = request.contentLength + } + + if (contentLength === 0 && !expectsPayload) { + // https://tools.ietf.org/html/rfc7230#section-3.3.2 + // A user agent SHOULD NOT send a Content-Length header field when + // the request message does not contain a payload body and the method + // semantics do not anticipate such a body. + + contentLength = null + } + + if (request.contentLength !== null && request.contentLength !== contentLength) { + if (client[kStrictContentLength]) { + errorRequest(client, request, new RequestContentLengthMismatchError()) + return false + } + + process.emitWarning(new RequestContentLengthMismatchError()) + } + + try { + request.onConnect((err) => { + if (request.aborted || request.completed) { + return + } + + errorRequest(client, request, err || new RequestAbortedError()) + }) + } catch (err) { + errorRequest(client, request, err) + } + + if (request.aborted) { + return false + } + + const headers = {...request.headers} + headers[':authority'] = host || client[kHost] + headers[':path'] = path + + // TODO: Expect: 100-continue + + /* istanbul ignore else: assertion */ + if (!body) { + if (contentLength === 0) { + headers['content-length'] = '0' + } else { + assert(contentLength === null, 'no body must not have content length') + } + } else if (util.isBuffer(body)) { + assert(contentLength === body.byteLength, 'buffer body must have content length') + + headers['content-length'] = String(contentLength) + + process.nextTick(() => { + stream.end(body) + request.onBodySent(body) + }) + } else if (util.isBlob(body)) { + process.nextTick(() => { + writeBlob({ client, request, stream, contentLength, expectsPayload }) + }) + } else if (util.isStream(body)) { + process.nextTick(() => { + writeStream({ client, request, stream, contentLength, expectsPayload }) + }) + } else if (util.isIterable(body)) { + process.nextTick(() => { + writeIterable({ client, request, stream, contentLength, expectsPayload }) + }) + } else { + assert(false) + } + + console.log('http/2 request') + // TODO: @szmarczak: ref only if current streams count is 0 + session.ref() + const stream = session.request(headers) + stream.on('response', headers => { + if (request.onHeaders(Number(headers[':status']), headers, stream.resume.bind(stream), '') === false) { + stream.pause() + } + }) + stream.on('data', chunk => { + if (request.onData(chunk) === false) { + stream.pause() + } + }) + stream.on('trailers', headers => { + // TODO: @szmarczak + }) + stream.on('end', () => { + request.onComplete([]) + }) + stream.on('aborted', () => { + // TODO: @szmarczak + }) + stream.on('ready', () => { + // TODO: @szmarczak + }) + stream.on('timeout', () => { + // TODO: @szmarczak + }) + // TODO: @szmarczak: unref only if current streams count is 0 + stream.on('close', () => { + session.unref() + }) + + return true +} + function write (client, request) { + if (client[kHttp2Session]) { + console.log('http/2') + writeHttp2(client, client[kHttp2Session], request) + return + } + + // TODO: @szmarczak: upgrade is not supported in HTTP/2 const { body, method, path, host, upgrade, headers } = request // https://tools.ietf.org/html/rfc7231#section-4.3.1 @@ -1413,11 +1593,7 @@ function write (client, request) { let header = `${method} ${path} HTTP/1.1\r\n` - if (host) { - header += `host: ${host}\r\n` - } else { - header += client[kHostHeader] - } + header += `host: ${host || client[kHost]}\r\n` if (upgrade) { header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n` @@ -1427,8 +1603,8 @@ function write (client, request) { header += 'connection: close\r\n' } - if (headers) { - header += headers + for (const key in headers) { + header += `${key}: ${headers[key]}\r\n` } /* istanbul ignore else: assertion */ diff --git a/lib/core/connect.js b/lib/core/connect.js index 9a4afa9944e..a09e8070d45 100644 --- a/lib/core/connect.js +++ b/lib/core/connect.js @@ -29,6 +29,8 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) { const session = sessionCache.get(servername) || null socket = tls.connect({ + // TODO: @szmarczak: move this to client.js + ALPNProtocols: ['h2', 'http/1.1'], ...options, servername, session, diff --git a/lib/core/request.js b/lib/core/request.js index 7d1f866ea20..59ed9a3ee5a 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -85,7 +85,7 @@ class Request { this.contentType = null - this.headers = '' + this.headers = {} if (Array.isArray(headers)) { if (headers.length % 2 !== 0) { @@ -226,7 +226,7 @@ function processHeader (request, key, val) { ) { throw new NotSupportedError('expect header not supported') } else { - request.headers += `${key}: ${val}\r\n` + request.headers[key] = val } } diff --git a/lib/core/symbols.js b/lib/core/symbols.js index e2f009b5eed..200a15aeb27 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -36,8 +36,9 @@ module.exports = { kOnDestroyed: Symbol('destroy callbacks'), kPipelining: Symbol('pipelinig'), kSocket: Symbol('socket'), - kHostHeader: Symbol('host header'), + kHost: Symbol('host'), kConnector: Symbol('connector'), kStrictContentLength: Symbol('strict content length'), - kMaxRedirections: Symbol('maxRedirections') + kMaxRedirections: Symbol('maxRedirections'), + kHttp2Session: Symbol('http2Session') }