@@ -65,19 +65,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
6565 if (filter_actions_dag)
6666 predicate = filter_actions_dag->getOutputs ().at (0 );
6767
68- auto max_replicas_to_use = static_cast <UInt64>(cluster->getShardsInfo ().size ());
69- if (context->getSettingsRef ()[Setting::max_parallel_replicas] > 1 )
70- max_replicas_to_use = std::min (max_replicas_to_use, context->getSettingsRef ()[Setting::max_parallel_replicas].value );
71-
72- createExtension (predicate, max_replicas_to_use);
68+ createExtension (predicate);
7369}
7470
75- void ReadFromCluster::createExtension (const ActionsDAG::Node * predicate, size_t number_of_replicas )
71+ void ReadFromCluster::createExtension (const ActionsDAG::Node * predicate)
7672{
7773 if (extension)
7874 return ;
7975
80- extension = storage->getTaskIteratorExtension (predicate, context, number_of_replicas );
76+ extension = storage->getTaskIteratorExtension (predicate, context, cluster );
8177}
8278
8379// / The code executes on initiator
@@ -178,7 +174,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
178174 if (current_settings[Setting::max_parallel_replicas] > 1 )
179175 max_replicas_to_use = std::min (max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value );
180176
181- createExtension (nullptr , max_replicas_to_use );
177+ createExtension (nullptr );
182178
183179 for (const auto & shard_info : cluster->getShardsInfo ())
184180 {
0 commit comments