Skip to content

Commit 28d93e0

Browse files
prashantgolashfacebook-github-bot
authored andcommitted
[Coordinator throttling] Scheduling Policies for Admission Control based on worker load (prestodb#25689)
Summary: Admission control scheduling policy **Logic** Gather worker overload data from the added end point in PR - prestodb#25687 Based on configured policies (cnt of overloaded workers or pct of overloaded workers) and cluster overload, queue the queries **Background** RFC PR: prestodb/rfcs#42 **Metrics on queuing due to this feature:** Added following ODS metrics - ClusterOverloadDuration - ClusterOverloadCount **Feature flag:** Right now feature is disabled. We can use coordinator configs to enable / add thresholds Differential Revision: D79470181
1 parent 3184472 commit 28d93e0

File tree

22 files changed

+1438
-33
lines changed

22 files changed

+1438
-33
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution;
15+
16+
import com.facebook.airlift.configuration.Config;
17+
18+
public class ClusterOverloadConfig
19+
{
20+
public static final String OVERLOAD_POLICY_CNT_BASED = "overload_worker_cnt_based_throttling";
21+
public static final String OVERLOAD_POLICY_PCT_BASED = "overload_worker_pct_based_throttling";
22+
private boolean clusterOverloadThrottlingEnabled;
23+
private double allowedOverloadWorkersPct = 0.01;
24+
private int allowedOverloadWorkersCnt;
25+
private String overloadPolicyType = OVERLOAD_POLICY_CNT_BASED;
26+
private int overloadCheckCacheTtlInSecs = 5;
27+
28+
/**
29+
* Gets the time-to-live for the cached cluster overload state.
30+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
31+
*
32+
* @return the cache TTL duration
33+
*/
34+
public int getOverloadCheckCacheTtlInSecs()
35+
{
36+
return overloadCheckCacheTtlInSecs;
37+
}
38+
39+
/**
40+
* Gets the time-to-live for the cached cluster overload state.
41+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
42+
*
43+
* @return the cache TTL duration
44+
*/
45+
public int getOverloadCheckCacheTtlMillis()
46+
{
47+
return overloadCheckCacheTtlInSecs * 1000;
48+
}
49+
50+
/**
51+
* Sets the time-to-live for the cached cluster overload state.
52+
*
53+
* @param overloadCheckCacheTtlInSecs the cache TTL duration
54+
* @return this for chaining
55+
*/
56+
@Config("cluster.overload-check-cache-ttl-secs")
57+
public ClusterOverloadConfig setOverloadCheckCacheTtlInSecs(int overloadCheckCacheTtlInSecs)
58+
{
59+
this.overloadCheckCacheTtlInSecs = overloadCheckCacheTtlInSecs;
60+
return this;
61+
}
62+
63+
@Config("cluster-overload.enable-throttling")
64+
public ClusterOverloadConfig setClusterOverloadThrottlingEnabled(boolean clusterOverloadThrottlingEnabled)
65+
{
66+
this.clusterOverloadThrottlingEnabled = clusterOverloadThrottlingEnabled;
67+
return this;
68+
}
69+
70+
public boolean isClusterOverloadThrottlingEnabled()
71+
{
72+
return this.clusterOverloadThrottlingEnabled;
73+
}
74+
75+
@Config("cluster-overload.allowed-overload-workers-pct")
76+
public ClusterOverloadConfig setAllowedOverloadWorkersPct(Double allowedOverloadWorkersPct)
77+
{
78+
this.allowedOverloadWorkersPct = allowedOverloadWorkersPct;
79+
return this;
80+
}
81+
82+
public double getAllowedOverloadWorkersPct()
83+
{
84+
return this.allowedOverloadWorkersPct;
85+
}
86+
87+
@Config("cluster-overload.allowed-overload-workers-cnt")
88+
public ClusterOverloadConfig setAllowedOverloadWorkersCnt(int allowedOverloadWorkersCnt)
89+
{
90+
this.allowedOverloadWorkersCnt = allowedOverloadWorkersCnt;
91+
return this;
92+
}
93+
94+
public double getAllowedOverloadWorkersCnt()
95+
{
96+
return this.allowedOverloadWorkersCnt;
97+
}
98+
99+
@Config("cluster-overload.overload-policy-type")
100+
public ClusterOverloadConfig setOverloadPolicyType(String overloadPolicyType)
101+
{
102+
// validate
103+
this.overloadPolicyType = overloadPolicyType;
104+
return this;
105+
}
106+
107+
public String getOverloadPolicyType()
108+
{
109+
return this.overloadPolicyType;
110+
}
111+
}

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.facebook.airlift.units.Duration;
1919
import com.facebook.presto.execution.ManagedQueryExecution;
2020
import com.facebook.presto.execution.resourceGroups.WeightedFairQueue.Usage;
21+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker;
2122
import com.facebook.presto.metadata.InternalNodeManager;
2223
import com.facebook.presto.server.QueryStateInfo;
2324
import com.facebook.presto.server.ResourceGroupInfo;
@@ -96,6 +97,7 @@ public class InternalResourceGroup
9697
private final Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo;
9798
private final Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate;
9899
private final InternalNodeManager nodeManager;
100+
private final ClusterResourceChecker clusterResourceChecker;
99101

100102
// Configuration
101103
// =============
@@ -166,12 +168,14 @@ protected InternalResourceGroup(
166168
boolean staticResourceGroup,
167169
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
168170
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
169-
InternalNodeManager nodeManager)
171+
InternalNodeManager nodeManager,
172+
ClusterResourceChecker clusterResourceChecker)
170173
{
171174
this.parent = requireNonNull(parent, "parent is null");
172175
this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
173176
this.executor = requireNonNull(executor, "executor is null");
174177
this.nodeManager = requireNonNull(nodeManager, "node manager is null");
178+
this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null");
175179
requireNonNull(name, "name is null");
176180
if (parent.isPresent()) {
177181
id = new ResourceGroupId(parent.get().id, name);
@@ -671,7 +675,8 @@ public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegm
671675
staticResourceGroup && staticSegment,
672676
additionalRuntimeInfo,
673677
shouldWaitForResourceManagerUpdate,
674-
nodeManager);
678+
nodeManager,
679+
clusterResourceChecker);
675680
// Sub group must use query priority to ensure ordering
676681
if (schedulingPolicy == QUERY_PRIORITY) {
677682
subGroup.setSchedulingPolicy(QUERY_PRIORITY);
@@ -1019,6 +1024,11 @@ private boolean canRunMore()
10191024
{
10201025
checkState(Thread.holdsLock(root), "Must hold lock");
10211026
synchronized (root) {
1027+
// Check if more queries can be run on the cluster based on cluster overload
1028+
if (!clusterResourceChecker.canRunMoreOnCluster(nodeManager)) {
1029+
return false;
1030+
}
1031+
10221032
if (cpuUsageMillis >= hardCpuLimitMillis) {
10231033
return false;
10241034
}
@@ -1135,7 +1145,8 @@ public RootInternalResourceGroup(
11351145
Executor executor,
11361146
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
11371147
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
1138-
InternalNodeManager nodeManager)
1148+
InternalNodeManager nodeManager,
1149+
ClusterResourceChecker clusterResourceChecker)
11391150
{
11401151
super(Optional.empty(),
11411152
name,
@@ -1144,7 +1155,8 @@ public RootInternalResourceGroup(
11441155
true,
11451156
additionalRuntimeInfo,
11461157
shouldWaitForResourceManagerUpdate,
1147-
nodeManager);
1158+
nodeManager,
1159+
clusterResourceChecker);
11481160
}
11491161

11501162
public synchronized void processQueuedQueries()

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.facebook.presto.execution.ManagedQueryExecution;
2020
import com.facebook.presto.execution.QueryManagerConfig;
2121
import com.facebook.presto.execution.resourceGroups.InternalResourceGroup.RootInternalResourceGroup;
22+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker;
2223
import com.facebook.presto.metadata.InternalNodeManager;
2324
import com.facebook.presto.resourcemanager.ResourceGroupService;
2425
import com.facebook.presto.server.ResourceGroupInfo;
@@ -112,6 +113,7 @@ public final class InternalResourceGroupManager<C>
112113
private final QueryManagerConfig queryManagerConfig;
113114
private final InternalNodeManager nodeManager;
114115
private AtomicBoolean isConfigurationManagerLoaded;
116+
private final ClusterResourceChecker clusterResourceChecker;
115117

116118
@Inject
117119
public InternalResourceGroupManager(
@@ -121,7 +123,8 @@ public InternalResourceGroupManager(
121123
MBeanExporter exporter,
122124
ResourceGroupService resourceGroupService,
123125
ServerConfig serverConfig,
124-
InternalNodeManager nodeManager)
126+
InternalNodeManager nodeManager,
127+
ClusterResourceChecker clusterResourceChecker)
125128
{
126129
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
127130
this.exporter = requireNonNull(exporter, "exporter is null");
@@ -137,6 +140,7 @@ public InternalResourceGroupManager(
137140
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
138141
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
139142
this.isConfigurationManagerLoaded = new AtomicBoolean(false);
143+
this.clusterResourceChecker = clusterResourceChecker;
140144
}
141145

142146
@Override
@@ -396,7 +400,7 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
396400
else {
397401
RootInternalResourceGroup root;
398402
if (!isResourceManagerEnabled) {
399-
root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager);
403+
root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager, clusterResourceChecker);
400404
}
401405
else {
402406
root = new RootInternalResourceGroup(
@@ -409,7 +413,8 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
409413
resourceGroupRuntimeInfosSnapshot::get,
410414
lastUpdatedResourceGroupRuntimeInfo::get,
411415
concurrencyThreshold),
412-
nodeManager);
416+
nodeManager,
417+
clusterResourceChecker);
413418
}
414419
group = root;
415420
rootGroups.add(root);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package com.facebook.presto.execution.scheduler.clusterOverload;
16+
17+
import com.facebook.presto.metadata.InternalNodeManager;
18+
19+
/**
20+
* Interface for policies that determine if cluster is overloaded.
21+
* Implementations can check various metrics from NodeStats to determine
22+
* if a worker is overloaded and queries should be throttled.
23+
*/
24+
public interface ClusterOverloadPolicy
25+
{
26+
/**
27+
* Checks if cluster is overloaded.
28+
*
29+
* @param nodeManager The node manager to get node information
30+
* @return true if cluster is overloaded, false otherwise
31+
*/
32+
boolean isClusterOverloaded(InternalNodeManager nodeManager);
33+
34+
/**
35+
* Gets the name of the policy.
36+
*
37+
* @return The name of the policy
38+
*/
39+
String getName();
40+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution.scheduler.clusterOverload;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import jakarta.inject.Inject;
18+
19+
import java.util.Map;
20+
import java.util.Optional;
21+
22+
import static java.util.Objects.requireNonNull;
23+
24+
/**
25+
* Factory for creating ClusterOverloadPolicy instances.
26+
* This allows for extensible policy creation based on configuration.
27+
*/
28+
public class ClusterOverloadPolicyFactory
29+
{
30+
private final Map<String, ClusterOverloadPolicy> policies;
31+
32+
@Inject
33+
public ClusterOverloadPolicyFactory(ClusterOverloadPolicy clusterOverloadPolicy)
34+
{
35+
requireNonNull(clusterOverloadPolicy, "clusterOverloadPolicy is null");
36+
37+
// Register available policies
38+
ImmutableMap.Builder<String, ClusterOverloadPolicy> policiesBuilder = ImmutableMap.builder();
39+
40+
// Add the default overload policy - use the injected instance
41+
policiesBuilder.put(clusterOverloadPolicy.getName(), clusterOverloadPolicy);
42+
43+
// Add more policies here as they are implemented
44+
this.policies = policiesBuilder.build();
45+
}
46+
47+
/**
48+
* Get a policy by name.
49+
*
50+
* @param name The name of the policy to get
51+
* @return The policy, or empty if no policy with that name exists
52+
*/
53+
public Optional<ClusterOverloadPolicy> getPolicy(String name)
54+
{
55+
return Optional.ofNullable(policies.get(name));
56+
}
57+
58+
/**
59+
* Get the default policy.
60+
*
61+
* @return The default policy
62+
*/
63+
public ClusterOverloadPolicy getDefaultPolicy()
64+
{
65+
// Default to CPU/Memory policy
66+
return policies.get("cpu-memory-overload");
67+
}
68+
69+
/**
70+
* Get all available policies.
71+
*
72+
* @return Map of policy name to policy
73+
*/
74+
public Map<String, ClusterOverloadPolicy> getPolicies()
75+
{
76+
return policies;
77+
}
78+
}

0 commit comments

Comments
 (0)