forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathReadFromRemote.h
More file actions
138 lines (114 loc) · 4.85 KB
/
ReadFromRemote.h
File metadata and controls
138 lines (114 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#pragma once
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Core/QueryProcessingStage.h>
#include <Client/IConnections.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Core/UUID.h>
namespace DB
{
class IThrottler;
using ThrottlerPtr = std::shared_ptr<IThrottler>;
class ParallelReplicasReadingCoordinator;
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;
/// Reading step from remote servers.
/// Unite query results from several shards.
class ReadFromRemote final : public SourceStepWithFilterBase
{
public:
/// @param main_table_ if Shards contains main_table then this parameter will be ignored
ReadFromRemote(
ClusterProxy::SelectStreamFactory::Shards shards_,
SharedHeader header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
LoggerPtr log_,
UInt32 shard_count_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
const String & cluster_name_);
String getName() const override { return "ReadFromRemote"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeDistributedPlan(FormatSettings & settings, const ExplainPlanOptions & options) override;
void enableMemoryBoundMerging();
void enforceAggregationInOrder(const SortDescription & sort_description);
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
bool hasSerializedPlan() const;
private:
ClusterProxy::SelectStreamFactory::Shards shards;
QueryProcessingStage::Enum stage;
StorageID main_table;
ASTPtr table_func_ptr;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
LoggerPtr log;
UInt32 shard_count;
const String cluster_name;
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
bool is_remote_function = false;
Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const SharedHeader & out_header);
void addLazyPipe(
Pipes & pipes,
const ClusterProxy::SelectStreamFactory::Shard & shard,
const SharedHeader & out_header,
size_t parallel_marshalling_threads);
void addPipe(
Pipes & pipes,
const ClusterProxy::SelectStreamFactory::Shard & shard,
const SharedHeader & out_header,
size_t parallel_marshalling_threads);
};
class ReadFromParallelRemoteReplicasStep : public ISourceStep
{
public:
ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
SharedHeader header_,
QueryProcessingStage::Enum stage_,
ContextMutablePtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
LoggerPtr log_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
std::vector<ConnectionPoolPtr> pools_to_use,
std::optional<size_t> exclude_pool_index_ = std::nullopt,
ConnectionPoolWithFailoverPtr connection_pool_with_failover_ = nullptr);
String getName() const override { return "ReadFromRemoteParallelReplicas"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeDistributedPlan(FormatSettings & settings, const ExplainPlanOptions & options) override;
void enableMemoryBoundMerging();
void enforceAggregationInOrder(const SortDescription & sort_description);
StorageID getStorageID() const { return storage_id; }
ParallelReplicasReadingCoordinatorPtr getCoordinator() const { return coordinator; }
private:
Pipes addPipes(ASTPtr ast, const SharedHeader & out_header);
Pipe createPipeForSingeReplica(const ConnectionPoolPtr & pool, ASTPtr ast, IConnections::ReplicaInfo replica_info, const SharedHeader & out_header,
size_t parallel_marshalling_threads);
ClusterPtr cluster;
ASTPtr query_ast;
StorageID storage_id;
ParallelReplicasReadingCoordinatorPtr coordinator;
QueryProcessingStage::Enum stage;
ContextMutablePtr context;
ThrottlerPtr throttler;
Scalars scalars;
Tables external_tables;
std::shared_ptr<const StorageLimitsList> storage_limits;
LoggerPtr log;
std::vector<ConnectionPoolPtr> pools_to_use;
std::optional<size_t> exclude_pool_index;
ConnectionPoolWithFailoverPtr connection_pool_with_failover;
};
}