Skip to content

Commit b5ae39e

Browse files
committed
Parse new format of ClickHouse exceptions
ClickHouse now implements an improved format for mid-stream exceptions: ClickHouse/ClickHouse#88818 This change adopts the new format and uses it when possible. Otherwise, it falls back to the older approach of searching for the __exception__ marker at the end of the buffered data.
1 parent b4a6e28 commit b5ae39e

File tree

2 files changed

+63
-26
lines changed

2 files changed

+63
-26
lines changed

contrib/poco/Net/include/Poco/Net/HTTPChunkedStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ class Net_API HTTPChunkedStreamBuf: public HTTPBasicStreamBuf
126126

127127
// Checks whether the prefetch buffer contains a ClickHouse exception.
128128
std::optional<ClickHouseException> checkForClickHouseException();
129+
std::optional<std::string> findClickHouseExceptionMessage(const char *, size_t length);
129130

130131
private:
131132
// Since `_prefetchBuffer` is a ring buffer, data may wrap around the buffer

contrib/poco/Net/src/HTTPChunkedStream.cpp

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414

1515
#include <cassert>
16+
#include <charconv>
1617
#include <zstd.h>
1718
#include "Poco/Net/HTTPChunkedStream.h"
1819
#include "Poco/Net/HTTPHeaderStream.h"
@@ -45,6 +46,7 @@ constexpr size_t PREFETCH_SIZE = 1024UL * 128 + 3; // Recommended zstd's input
4546
constexpr size_t PREFETCH_BUFFER_CAPACITY = PREFETCH_SIZE + MIN_LOOK_AHEAD_SIZE;
4647

4748
static const char CONTENT_ENCODING_HEADER[] = "content-encoding";
49+
static const char CH_EXCEPTION_TAG_HEADER[] = "x-clickhouse-exception-tag";
4850
static const char ZSTD_CONTENT_ENCODING[] = "zstd";
4951

5052
struct HTTPChunkedStreamBuf::ZstdContext
@@ -67,13 +69,13 @@ HTTPChunkedStreamBuf::HTTPChunkedStreamBuf(
6769
_prefetchBufferSize(0),
6870
_prefetchBufferHead(0),
6971
_eof(false),
70-
_headers(headers),
72+
_headers(std::move(headers)),
7173
_compression(HTTPCompressionType::None),
7274
_zstd_context(new ZstdContext{}),
7375
_zstd_completed(false)
7476
{
75-
auto it = headers.find(CONTENT_ENCODING_HEADER);
76-
if (it != headers.end() && icompare(it->second, ZSTD_CONTENT_ENCODING) == 0) {
77+
auto it = _headers.find(CONTENT_ENCODING_HEADER);
78+
if (it != _headers.end() && icompare(it->second, ZSTD_CONTENT_ENCODING) == 0) {
7779
_compression = HTTPCompressionType::ZSTD;
7880
_zstd_context->dstream.reset(ZSTD_createDStream());
7981
ZSTD_initDStream(_zstd_context->dstream.get());
@@ -373,26 +375,66 @@ int HTTPChunkedStreamBuf::readCharFromSocket()
373375
return buffer;
374376
}
375377

376-
namespace {
377-
/**
378-
* Function searches for the ClickHouse exception marker in the iterator range
379-
* and returns the string that comes after that if the marker is found,
380-
* otherwise nothing.
381-
*/
382-
template <typename It>
383-
std::optional<std::string> findClickHouseExceptionMessage(It begin, It end)
378+
std::optional<std::string> HTTPChunkedStreamBuf::findClickHouseExceptionMessage(const char * buffer, size_t length)
384379
{
385-
const std::string_view exception_marker{"__exception__\r\n"};
386380

387-
auto it = std::find_end(begin, end, exception_marker.begin(), exception_marker.end());
388-
if (it != end) {
389-
std::string message(it + exception_marker.size(), end);
390-
return message;
381+
auto exception_tag_it = _headers.find(CH_EXCEPTION_TAG_HEADER);
382+
if (exception_tag_it != _headers.end()) {
383+
// We have exception marker then the format is:
384+
// ....__exception__\r\n<exception_tag>\r\n<exception_message>123 <exception_tag>\r\n__exception__\r\n
385+
// ....|~~~ opening exception marker ~~~~~|~exception message~...|~~~~~ closing exception marker ~~~~|
386+
// where 123 is the size of <exception message>
387+
388+
const std::string & exception_tag = exception_tag_it->second;
389+
const std::string closing_exception_marker = " " + exception_tag + "\r\n__exception__\r\n";
390+
if (length < closing_exception_marker.size())
391+
return std::nullopt;
392+
393+
// Roll back to the start of the closing exception marker
394+
bool has_exception_marker = std::equal(
395+
buffer + length - closing_exception_marker.size(),
396+
buffer + length,
397+
closing_exception_marker.begin());
398+
if (!has_exception_marker)
399+
return std::nullopt;
400+
401+
const char * begin = buffer;
402+
const char * end = buffer + length - closing_exception_marker.size();
403+
404+
// Roll back the start of the exception message size
405+
auto size_it = std::find_if_not(
406+
std::make_reverse_iterator(end),
407+
std::make_reverse_iterator(begin),
408+
[](const char c){ return ::isdigit(c); });
409+
if (size_it == std::make_reverse_iterator(end))
410+
return std::nullopt;
411+
const char * size_begin = size_it.base();
412+
413+
// Parse size
414+
size_t exception_message_size = 0;
415+
auto from_chars_res = std::from_chars(size_begin, end, exception_message_size);
416+
if (from_chars_res.ec != std::errc())
417+
return std::nullopt;
418+
419+
if (size_begin - begin < exception_message_size)
420+
return std::nullopt;
421+
422+
end = size_begin;
423+
return std::string(end - exception_message_size, end);
424+
} else {
425+
// older version of ClickHouse - everything that comes after __exception__\r\n
426+
// is an exception message. However it might be a false positive, if the data
427+
// itself contains this sequence.
428+
const std::string_view exception_marker{"__exception__\r\n"};
429+
const char * begin = buffer;
430+
const char * end = buffer + length;
431+
auto it = std::find_end(begin, end, exception_marker.begin(), exception_marker.end());
432+
if (it != end)
433+
return std::string(it + exception_marker.size(), end);
391434
}
392435

393436
return std::nullopt;
394437
}
395-
} // anonymous namespace
396438

397439
/**
398440
* Scans the prefetch buffer for a ClickHouse exception marker.
@@ -409,17 +451,14 @@ std::optional<ClickHouseException> HTTPChunkedStreamBuf::checkForClickHouseExcep
409451

410452
switch (_compression) {
411453
case HTTPCompressionType::None: {
412-
const char * begin = &_prefetchBuffer[_prefetchBufferHead];
413-
const char * end = &_prefetchBuffer[_prefetchBufferHead + _prefetchBufferSize];
414-
415-
if (auto message = findClickHouseExceptionMessage(begin, end))
454+
if (auto message = findClickHouseExceptionMessage(_prefetchBuffer.data(), _prefetchBufferSize))
416455
return ClickHouseException(*message);
417456

418457
return std::nullopt;
419458
}
420459
case HTTPCompressionType::ZSTD: {
421460

422-
ZSTD_inBuffer in = {&_prefetchBuffer[_prefetchBufferHead], _prefetchBufferSize, 0};
461+
ZSTD_inBuffer in = {_prefetchBuffer.data(), _prefetchBufferSize, 0};
423462

424463
// ring buffer for uncompressed data
425464
std::vector<char> out_buffer(ZSTD_DStreamOutSize());
@@ -463,10 +502,7 @@ std::optional<ClickHouseException> HTTPChunkedStreamBuf::checkForClickHouseExcep
463502
// beginning of the buffer, ahead of actual data. This is exactly what we need.
464503
std::rotate(out_buffer.begin(), out_buffer.begin() + head, out_buffer.end());
465504

466-
const char * begin = out_buffer.data();
467-
const char * end = out_buffer.data() + out_buffer.size();
468-
469-
if (auto message = findClickHouseExceptionMessage(begin, end)) {
505+
if (auto message = findClickHouseExceptionMessage(out_buffer.data(), out_buffer.size())) {
470506
return ClickHouseException(*message);
471507
}
472508

0 commit comments

Comments
 (0)