@@ -20,11 +20,15 @@ namespace ErrorCodes
2020}
2121
2222RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext (
23- RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
23+ RemoteQueryExecutor & executor_,
24+ bool suspend_when_query_sent_,
25+ bool read_packet_type_separately_,
26+ bool allow_retries_in_cluster_requests_)
2427 : AsyncTaskExecutor(std::make_unique<Task>(*this ))
2528 , executor(executor_)
2629 , suspend_when_query_sent(suspend_when_query_sent_)
2730 , read_packet_type_separately(read_packet_type_separately_)
31+ , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2832{
2933 if (-1 == pipe2 (pipe_fd, O_NONBLOCK))
3034 throw ErrnoException (ErrorCodes::CANNOT_OPEN_FILE, " Cannot create pipe" );
@@ -55,37 +59,46 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5559 if (read_context.executor .needToSkipUnavailableShard ())
5660 return ;
5761
58- while ( true )
62+ try
5963 {
60- try
64+ while ( true )
6165 {
62- read_context.has_read_packet_part = PacketPart::None;
63-
64- if (read_context.read_packet_type_separately )
66+ try
6567 {
66- read_context.packet .type = read_context.executor .getConnections ().receivePacketTypeUnlocked (async_callback);
67- read_context.has_read_packet_part = PacketPart::Type;
68- suspend_callback ();
68+ read_context.has_read_packet_part = PacketPart::None;
69+
70+ if (read_context.read_packet_type_separately )
71+ {
72+ read_context.packet .type = read_context.executor .getConnections ().receivePacketTypeUnlocked (async_callback);
73+ read_context.has_read_packet_part = PacketPart::Type;
74+ suspend_callback ();
75+ }
76+ read_context.packet = read_context.executor .getConnections ().receivePacketUnlocked (async_callback);
77+ read_context.has_read_packet_part = PacketPart::Body;
78+ if (read_context.packet .type == Protocol::Server::Data)
79+ read_context.has_data_packets = true ;
6980 }
70- read_context.packet = read_context.executor .getConnections ().receivePacketUnlocked (async_callback);
71- read_context.has_read_packet_part = PacketPart::Body;
72- if (read_context.packet .type == Protocol::Server::Data)
73- read_context.has_data_packets = true ;
74- }
75- catch (const Exception & e)
76- {
77- // / If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
78- // / If initiator did not process any data packets before, this fact can be ignored.
79- // / Unprocessed tasks will be executed on other nodes.
80- if (e.code () == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
81- && !read_context.has_data_packets .load () && read_context.executor .skipUnavailableShards ())
81+ catch (const Exception & e)
8282 {
83- read_context.has_read_packet_part = PacketPart::None;
83+ // / If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
84+ // / If initiator did not process any data packets before, this fact can be ignored.
85+ // / Unprocessed tasks will be executed on other nodes.
86+ if (e.code () == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
87+ && !read_context.has_data_packets .load () && read_context.executor .skipUnavailableShards ())
88+ {
89+ read_context.has_read_packet_part = PacketPart::None;
90+ }
91+ else
92+ throw ;
8493 }
85- else
86- throw ;
8794 }
88-
95+ }
96+ catch (const Exception &)
97+ {
98+ if (!read_context.allow_retries_in_cluster_requests )
99+ throw ;
100+ read_context.packet .type = Protocol::Server::ConnectionLost;
101+ read_context.packet .exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern (true ), getCurrentExceptionCode ());
89102 suspend_callback ();
90103 }
91104}
0 commit comments