Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ interface ParsedClickHouseError {
type?: string
}

/** An error that is thrown by the ClickHouse server. */
export class ClickHouseError extends Error {
readonly code: string
readonly type: string | undefined
Expand All @@ -32,3 +33,29 @@ export function parseError(input: string | Error): ClickHouseError | Error {
return inputIsError ? input : new Error(input)
}
}

export function getCurrentStackTrace(): string {
const originalStackTraceLimit = Error.stackTraceLimit
Error.stackTraceLimit = 100
const stack = new Error().stack
Error.stackTraceLimit = originalStackTraceLimit

if (!stack) return ''

// Skip the first three lines of the stack trace, containing useless information
// - Text `Error`
// - Info about this function call
// - Info about the originator of this function call, e.g., `request`
// Additionally, the original stack trace is, in fact, reversed.
return stack.split('\n').slice(3).reverse().join('\n')
}

export function addStackTrace<E extends Error>(err: E, stackTrace: string): E {
if (err.stack) {
const firstNewlineIndex = err.stack.indexOf('\n')
const firstLine = err.stack.substring(0, firstNewlineIndex)
const errStack = err.stack.substring(firstNewlineIndex + 1)
err.stack = `${firstLine}\n${stackTrace}\n${errStack}`
}
return err
}
2 changes: 1 addition & 1 deletion packages/client-common/src/error/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from './parse_error'
export * from './error'
4 changes: 2 additions & 2 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export type {
} from './parse'
export { SimpleColumnTypes, parseColumnType } from './parse'

/** For implementations usage only - should not be re-exported */
/** For implementation usage only - should not be re-exported */
export {
formatQuerySettings,
formatQueryParams,
Expand Down Expand Up @@ -112,7 +112,7 @@ export {
isJWTAuth,
} from './utils'
export { LogWriter, DefaultLogger, type LogWriterParams } from './logger'
export { parseError } from './error'
export { parseError, getCurrentStackTrace, addStackTrace } from './error'
export type {
CompressionSettings,
Connection,
Expand Down
165 changes: 102 additions & 63 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import type {
ResponseHeaders,
} from '@clickhouse/client-common'
import {
addStackTrace,
getCurrentStackTrace,
isCredentialsAuth,
isJWTAuth,
isSuccessfulResponse,
Expand Down Expand Up @@ -119,7 +121,7 @@ export abstract class NodeBaseConnection
return { success: true }
} catch (error) {
// it is used to ensure that the outgoing request is terminated,
// and we don't get an unhandled error propagation later
// and we don't get unhandled error propagation later
abortController.abort('Ping failed')
// not an error, as this might be semi-expected
this.logger.warn({
Expand Down Expand Up @@ -150,7 +152,7 @@ export abstract class NodeBaseConnection
role: params.role,
})
const { controller, controllerCleanup } = this.getAbortController(params)
// allows to enforce the compression via the settings even if the client instance has it disabled
// allows enforcing the compression via the settings even if the client instance has it disabled
const enableResponseCompression =
clickhouse_settings.enable_http_compression === 1
try {
Expand Down Expand Up @@ -423,7 +425,7 @@ export abstract class NodeBaseConnection
const { controller, controllerCleanup } = this.getAbortController(params)
const tryDecompressResponseStream =
params.op === 'Exec'
? // allows to disable stream decompression for the `Exec` operation only
? // allows disabling stream decompression for the `Exec` operation only
(params.decompress_response_stream ??
this.params.compression.decompress_response)
: // there is nothing useful in the response stream for the `Command` operation,
Expand Down Expand Up @@ -476,12 +478,15 @@ export abstract class NodeBaseConnection
// allows the event loop to process the idle socket timers, if the CPU load is high
// otherwise, we can occasionally get an expired socket, see https://github.com/ClickHouse/clickhouse-js/issues/294
await sleep(0)
const stackTrace = getCurrentStackTrace()
const logger = this.logger
return new Promise((resolve, reject) => {
const start = Date.now()
const request = this.createClientRequest(params)

function onError(err: Error): void {
function onError(e: Error): void {
removeRequestListeners()
const err = addStackTrace(e, stackTrace)
reject(err)
}

Expand All @@ -498,7 +503,8 @@ export abstract class NodeBaseConnection
if (tryDecompressResponseStream || isFailedResponse) {
const decompressionResult = decompressResponse(_response, this.logger)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
const err = addStackTrace(decompressionResult.error, stackTrace)
return reject(err)
}
responseStream = decompressionResult.response
} else {
Expand All @@ -507,9 +513,11 @@ export abstract class NodeBaseConnection
if (isFailedResponse) {
try {
const errorMessage = await getAsText(responseStream)
reject(parseError(errorMessage))
} catch (err) {
const err = addStackTrace(parseError(errorMessage), stackTrace)
reject(err)
} catch (e) {
// If the ClickHouse response is malformed
const err = addStackTrace(e as Error, stackTrace)
reject(err)
}
} else {
Expand All @@ -533,7 +541,11 @@ export abstract class NodeBaseConnection
* see the full sequence of events https://nodejs.org/api/http.html#httprequesturl-options-callback
* */
})
reject(new Error('The user aborted a request.'))
const err = addStackTrace(
new Error('The user aborted a request.'),
stackTrace,
)
reject(err)
}

function onClose(): void {
Expand All @@ -553,9 +565,10 @@ export abstract class NodeBaseConnection
? params.body
: Stream.Readable.from([params.body])

const callback = (err: NodeJS.ErrnoException | null): void => {
if (err) {
const callback = (e: NodeJS.ErrnoException | null): void => {
if (e) {
removeRequestListeners()
const err = addStackTrace(e, stackTrace)
reject(err)
}
}
Expand All @@ -568,79 +581,94 @@ export abstract class NodeBaseConnection
}

const onSocket = (socket: net.Socket) => {
if (
this.params.keep_alive.enabled &&
this.params.keep_alive.idle_socket_ttl > 0
) {
const socketInfo = this.knownSockets.get(socket)
// It is the first time we encounter this socket,
// so it doesn't have the idle timeout handler attached to it
if (socketInfo === undefined) {
const socketId = crypto.randomUUID()
this.logger.trace({
message: `Using a fresh socket ${socketId}, setting up a new 'free' listener`,
})
this.knownSockets.set(socket, {
id: socketId,
idle_timeout_handle: undefined,
})
// When the request is complete and the socket is released,
// make sure that the socket is removed after `idleSocketTTL`.
socket.on('free', () => {
try {
if (
this.params.keep_alive.enabled &&
this.params.keep_alive.idle_socket_ttl > 0
) {
const socketInfo = this.knownSockets.get(socket)
// It is the first time we've encountered this socket,
// so it doesn't have the idle timeout handler attached to it
if (socketInfo === undefined) {
const socketId = crypto.randomUUID()
this.logger.trace({
message: `Socket ${socketId} was released`,
message: `Using a fresh socket ${socketId}, setting up a new 'free' listener`,
})
// Avoiding the built-in socket.timeout() method usage here,
// as we don't want to clash with the actual request timeout.
const idleTimeoutHandle = setTimeout(() => {
this.logger.trace({
message: `Removing socket ${socketId} after ${this.idleSocketTTL} ms of idle`,
})
this.knownSockets.delete(socket)
socket.destroy()
}, this.idleSocketTTL).unref()
this.knownSockets.set(socket, {
id: socketId,
idle_timeout_handle: idleTimeoutHandle,
idle_timeout_handle: undefined,
})
// When the request is complete and the socket is released,
// make sure that the socket is removed after `idleSocketTTL`.
socket.on('free', () => {
this.logger.trace({
message: `Socket ${socketId} was released`,
})
// Avoiding the built-in socket.timeout() method usage here,
// as we don't want to clash with the actual request timeout.
const idleTimeoutHandle = setTimeout(() => {
this.logger.trace({
message: `Removing socket ${socketId} after ${this.idleSocketTTL} ms of idle`,
})
this.knownSockets.delete(socket)
socket.destroy()
}, this.idleSocketTTL).unref()
this.knownSockets.set(socket, {
id: socketId,
idle_timeout_handle: idleTimeoutHandle,
})
})
})

const cleanup = () => {
const maybeSocketInfo = this.knownSockets.get(socket)
// clean up a possibly dangling idle timeout handle (preventing leaks)
if (maybeSocketInfo?.idle_timeout_handle) {
clearTimeout(maybeSocketInfo.idle_timeout_handle)
const cleanup = () => {
const maybeSocketInfo = this.knownSockets.get(socket)
// clean up a possibly dangling idle timeout handle (preventing leaks)
if (maybeSocketInfo?.idle_timeout_handle) {
clearTimeout(maybeSocketInfo.idle_timeout_handle)
}
this.logger.trace({
message: `Socket ${socketId} was closed or ended, 'free' listener removed`,
})
}
socket.once('end', cleanup)
socket.once('close', cleanup)
} else {
clearTimeout(socketInfo.idle_timeout_handle)
this.logger.trace({
message: `Socket ${socketId} was closed or ended, 'free' listener removed`,
message: `Reusing socket ${socketInfo.id}`,
})
this.knownSockets.set(socket, {
...socketInfo,
idle_timeout_handle: undefined,
})
}
socket.once('end', cleanup)
socket.once('close', cleanup)
} else {
clearTimeout(socketInfo.idle_timeout_handle)
this.logger.trace({
message: `Reusing socket ${socketInfo.id}`,
})
this.knownSockets.set(socket, {
...socketInfo,
idle_timeout_handle: undefined,
})
}
} catch (e) {
logger.error({
message: 'An error occurred while housekeeping the idle sockets',
err: e as Error,
})
}

// Socket is "prepared" with idle handlers, continue with our request
pipeStream()

// This is for request timeout only. Surprisingly, it is not always enough to set in the HTTP request.
// The socket won't be actually destroyed, and it will be returned to the pool.
// The socket won't be destroyed, and it will be returned to the pool.
socket.setTimeout(this.params.request_timeout, onTimeout)
}

function onTimeout(): void {
const err = addStackTrace(new Error('Timeout error.'), stackTrace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error constructor seems to have { cause } for changing the error messages https://nodejs.org/docs/latest-v18.x/api/errors.html#errorcapturestacktracetargetobject-constructoropt

removeRequestListeners()
request.destroy()
reject(new Error('Timeout error.'))
try {
request.destroy()
} catch (e) {
logger.error({
message: 'An error occurred while destroying the request',
err: e as Error,
})
}
reject(err)
}

function removeRequestListeners(): void {
Expand All @@ -663,10 +691,21 @@ export abstract class NodeBaseConnection
request.on('close', onClose)

if (params.abort_signal !== undefined) {
params.abort_signal.addEventListener('abort', onAbort, { once: true })
params.abort_signal.addEventListener('abort', onAbort, {
once: true,
})
}

if (!params.body) return request.end()
if (!params.body) {
try {
return request.end()
} catch (e) {
this.logger.error({
message: 'An error occurred while ending the request without body',
err: e as Error,
})
}
}
})
}
}
Expand Down