Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .docker/clickhouse/single_node_tls/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM clickhouse/clickhouse-server:25.2-alpine
FROM clickhouse/clickhouse-server:25.6-alpine
COPY .docker/clickhouse/single_node_tls/certificates /etc/clickhouse-server/certs
RUN chown clickhouse:clickhouse -R /etc/clickhouse-server/certs \
&& chmod 600 /etc/clickhouse-server/certs/* \
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2.3'

services:
clickhouse1:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.6-alpine}'
ulimits:
nofile:
soft: 262144
Expand All @@ -21,7 +21,7 @@ services:
- './.docker/clickhouse/users.xml:/etc/clickhouse-server/users.xml'

clickhouse2:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.6-alpine}'
ulimits:
nofile:
soft: 262144
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#version: '3.8'
services:
clickhouse:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.2-alpine}'
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-25.6-alpine}'
container_name: 'clickhouse-js-clickhouse-server'
environment:
CLICKHOUSE_SKIP_USER_SETUP: 1
Expand Down
25 changes: 21 additions & 4 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export interface RequestParams {
// if there are compression headers, attempt to decompress it
try_decompress_response_stream?: boolean
parse_summary?: boolean
query: string
}

export abstract class NodeBaseConnection
Expand Down Expand Up @@ -115,6 +116,7 @@ export abstract class NodeBaseConnection
url: transformUrl({ url: this.params.url, pathname: '/ping' }),
abort_signal: abortController.signal,
headers: this.buildRequestHeaders(),
query: 'ping',
},
'Ping',
)
Expand Down Expand Up @@ -157,21 +159,22 @@ export abstract class NodeBaseConnection
const enableResponseCompression =
clickhouse_settings.enable_http_compression === 1
try {
const { stream, response_headers } = await this.request(
const { response_headers, stream } = await this.request(
{
method: 'POST',
url: transformUrl({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: controller.signal,
enable_response_compression: enableResponseCompression,
headers: this.buildRequestHeaders(params),
query: params.query,
},
'Query',
)
return {
stream,
query_id,
response_headers,
query_id,
}
} catch (err) {
controller.abort('Query HTTP request failed')
Expand Down Expand Up @@ -216,6 +219,7 @@ export abstract class NodeBaseConnection
enable_request_compression: this.params.compression.compress_request,
parse_summary: true,
headers: this.buildRequestHeaders(params),
query: params.query,
},
'Insert',
)
Expand Down Expand Up @@ -445,6 +449,7 @@ export abstract class NodeBaseConnection
this.params.compression.decompress_response,
try_decompress_response_stream: tryDecompressResponseStream,
headers: this.buildRequestHeaders(params),
query: params.query,
},
params.op,
)
Expand Down Expand Up @@ -493,12 +498,11 @@ export abstract class NodeBaseConnection
reject(err)
}

let responseStream: Stream.Readable
const onResponse = async (
_response: Http.IncomingMessage,
): Promise<void> => {
this.logResponse(op, request, params, _response, start)

let responseStream: Stream.Readable
const tryDecompressResponseStream =
params.try_decompress_response_stream ?? true
// even if the stream decompression is disabled, we have to decompress it in case of an error
Expand Down Expand Up @@ -637,6 +641,19 @@ export abstract class NodeBaseConnection
this.logger.trace({
message: `Socket ${socketId} was closed or ended, 'free' listener removed`,
})
if (responseStream && !responseStream.readableEnded) {
this.logger.warn({
message:
`${op}: socket was closed or ended before the response was fully read. ` +
'This can potentially result in an uncaught ECONNRESET error! ' +
'Consider fully consuming, draining, or destroying the response stream.',
args: {
query: params.query,
query_id:
params.url.searchParams.get('query_id') ?? 'unknown',
},
})
}
}
socket.once('end', cleanup)
socket.once('close', cleanup)
Expand Down