Skip to content

Commit 3f39007

Browse files
prashantgolashfacebook-github-bot
authored andcommitted
[Coordinator throttling] Scheduling Policies for Admission Control based on worker load (prestodb#25689)
Summary: Pull Request resolved: prestodb#25689 Admission control scheduling policy **Logic** Gather worker overload data from the added end point in PR - prestodb#25687 Based on configured policies (cnt of overloaded workers or pct of overloaded workers) and cluster overload, queue the queries **Background** RFC PR: prestodb/rfcs#42 **Metrics on queuing due to this feature:** Added following JMX metrics - ClusterOverloadDuration - ClusterOverloadCount **Feature flag:** Right now feature is disabled. We can use coordinator configs to enable / add thresholds Differential Revision: D79470181
1 parent b8cddb2 commit 3f39007

28 files changed

Lines changed: 2011 additions & 54 deletions
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.execution;
15+
16+
import com.facebook.airlift.configuration.Config;
17+
18+
public class ClusterOverloadConfig
19+
{
20+
public static final String OVERLOAD_POLICY_CNT_BASED = "overload_worker_cnt_based_throttling";
21+
public static final String OVERLOAD_POLICY_PCT_BASED = "overload_worker_pct_based_throttling";
22+
private boolean clusterOverloadThrottlingEnabled;
23+
private double allowedOverloadWorkersPct = 0.01;
24+
private int allowedOverloadWorkersCnt;
25+
private String overloadPolicyType = OVERLOAD_POLICY_CNT_BASED;
26+
private int overloadCheckCacheTtlInSecs = 5;
27+
28+
/**
29+
* Gets the time-to-live for the cached cluster overload state.
30+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
31+
*
32+
* @return the cache TTL duration
33+
*/
34+
public int getOverloadCheckCacheTtlInSecs()
35+
{
36+
return overloadCheckCacheTtlInSecs;
37+
}
38+
39+
/**
40+
* Gets the time-to-live for the cached cluster overload state.
41+
* This determines how frequently the system will re-evaluate whether the cluster is overloaded.
42+
*
43+
* @return the cache TTL duration
44+
*/
45+
public int getOverloadCheckCacheTtlMillis()
46+
{
47+
return overloadCheckCacheTtlInSecs * 1000;
48+
}
49+
50+
/**
51+
* Sets the time-to-live for the cached cluster overload state.
52+
*
53+
* @param overloadCheckCacheTtlInSecs the cache TTL duration
54+
* @return this for chaining
55+
*/
56+
@Config("cluster.overload-check-cache-ttl-secs")
57+
public ClusterOverloadConfig setOverloadCheckCacheTtlInSecs(int overloadCheckCacheTtlInSecs)
58+
{
59+
this.overloadCheckCacheTtlInSecs = overloadCheckCacheTtlInSecs;
60+
return this;
61+
}
62+
63+
@Config("cluster-overload.enable-throttling")
64+
public ClusterOverloadConfig setClusterOverloadThrottlingEnabled(boolean clusterOverloadThrottlingEnabled)
65+
{
66+
this.clusterOverloadThrottlingEnabled = clusterOverloadThrottlingEnabled;
67+
return this;
68+
}
69+
70+
public boolean isClusterOverloadThrottlingEnabled()
71+
{
72+
return this.clusterOverloadThrottlingEnabled;
73+
}
74+
75+
@Config("cluster-overload.allowed-overload-workers-pct")
76+
public ClusterOverloadConfig setAllowedOverloadWorkersPct(Double allowedOverloadWorkersPct)
77+
{
78+
this.allowedOverloadWorkersPct = allowedOverloadWorkersPct;
79+
return this;
80+
}
81+
82+
public double getAllowedOverloadWorkersPct()
83+
{
84+
return this.allowedOverloadWorkersPct;
85+
}
86+
87+
@Config("cluster-overload.allowed-overload-workers-cnt")
88+
public ClusterOverloadConfig setAllowedOverloadWorkersCnt(int allowedOverloadWorkersCnt)
89+
{
90+
this.allowedOverloadWorkersCnt = allowedOverloadWorkersCnt;
91+
return this;
92+
}
93+
94+
public double getAllowedOverloadWorkersCnt()
95+
{
96+
return this.allowedOverloadWorkersCnt;
97+
}
98+
99+
@Config("cluster-overload.overload-policy-type")
100+
public ClusterOverloadConfig setOverloadPolicyType(String overloadPolicyType)
101+
{
102+
// validate
103+
this.overloadPolicyType = overloadPolicyType;
104+
return this;
105+
}
106+
107+
public String getOverloadPolicyType()
108+
{
109+
return this.overloadPolicyType;
110+
}
111+
}

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,25 @@
1818
import com.facebook.airlift.units.Duration;
1919
import com.facebook.presto.execution.ManagedQueryExecution;
2020
import com.facebook.presto.execution.resourceGroups.WeightedFairQueue.Usage;
21+
import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker;
2122
import com.facebook.presto.metadata.InternalNodeManager;
2223
import com.facebook.presto.server.QueryStateInfo;
2324
import com.facebook.presto.server.ResourceGroupInfo;
2425
import com.facebook.presto.spi.PrestoException;
26+
import com.facebook.presto.spi.resourceGroups.AdmissionControlBypassConfig;
2527
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
2628
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
2729
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
2830
import com.facebook.presto.spi.resourceGroups.ResourceGroupState;
2931
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
32+
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
3033
import com.google.common.collect.ImmutableList;
3134
import com.google.errorprone.annotations.ThreadSafe;
3235
import com.google.errorprone.annotations.concurrent.GuardedBy;
3336
import org.weakref.jmx.Managed;
3437
import org.weakref.jmx.Nested;
3538

39+
import java.security.Principal;
3640
import java.util.Collection;
3741
import java.util.HashMap;
3842
import java.util.HashSet;
@@ -96,6 +100,7 @@ public class InternalResourceGroup
96100
private final Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo;
97101
private final Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate;
98102
private final InternalNodeManager nodeManager;
103+
private final ClusterResourceChecker clusterResourceChecker;
99104

100105
// Configuration
101106
// =============
@@ -123,6 +128,8 @@ public class InternalResourceGroup
123128
private boolean jmxExport;
124129
@GuardedBy("root")
125130
private ResourceGroupQueryLimits perQueryLimits = NO_LIMITS;
131+
@GuardedBy("root")
132+
private AdmissionControlBypassConfig admissionControlPolicyByPassConfig = AdmissionControlBypassConfig.NO_BYPASS;
126133

127134
// Live data structures
128135
// ====================
@@ -166,12 +173,14 @@ protected InternalResourceGroup(
166173
boolean staticResourceGroup,
167174
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
168175
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
169-
InternalNodeManager nodeManager)
176+
InternalNodeManager nodeManager,
177+
ClusterResourceChecker clusterResourceChecker)
170178
{
171179
this.parent = requireNonNull(parent, "parent is null");
172180
this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
173181
this.executor = requireNonNull(executor, "executor is null");
174182
this.nodeManager = requireNonNull(nodeManager, "node manager is null");
183+
this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null");
175184
requireNonNull(name, "name is null");
176185
if (parent.isPresent()) {
177186
id = new ResourceGroupId(parent.get().id, name);
@@ -653,6 +662,22 @@ public ResourceGroupQueryLimits getPerQueryLimits()
653662
}
654663
}
655664

665+
@Override
666+
public void setAdmissionControlPolicyByPassConfig(AdmissionControlBypassConfig admissionControlPolicyByPassConfig)
667+
{
668+
synchronized (root) {
669+
this.admissionControlPolicyByPassConfig = admissionControlPolicyByPassConfig;
670+
}
671+
}
672+
673+
@Override
674+
public AdmissionControlBypassConfig getAdmissionControlPolicyByPassConfig()
675+
{
676+
synchronized (root) {
677+
return admissionControlPolicyByPassConfig;
678+
}
679+
}
680+
656681
public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegment)
657682
{
658683
requireNonNull(name, "name is null");
@@ -671,7 +696,8 @@ public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegm
671696
staticResourceGroup && staticSegment,
672697
additionalRuntimeInfo,
673698
shouldWaitForResourceManagerUpdate,
674-
nodeManager);
699+
nodeManager,
700+
clusterResourceChecker);
675701
// Sub group must use query priority to ensure ordering
676702
if (schedulingPolicy == QUERY_PRIORITY) {
677703
subGroup.setSchedulingPolicy(QUERY_PRIORITY);
@@ -713,25 +739,34 @@ public void run(ManagedQueryExecution query)
713739
if (!subGroups.isEmpty()) {
714740
throw new PrestoException(INVALID_RESOURCE_GROUP, format("Cannot add queries to %s. It is not a leaf group.", id));
715741
}
716-
// Check all ancestors for capacity
717-
InternalResourceGroup group = this;
742+
718743
boolean canQueue = true;
719744
boolean canRun = true;
720-
while (true) {
721-
canQueue &= group.canQueueMore();
722-
canRun &= group.canRunMore();
723-
if (!group.parent.isPresent()) {
724-
break;
745+
746+
// Check if admission control could be skipped for this query based on configured RG policies
747+
SelectionCriteria selectionCriteria = new SelectionCriteria(query.getSession().getIdentity().getPrincipal().isPresent(), query.getSession().getUser(), query.getSession().getSource(), query.getSession().getClientTags(), query.getSession().getResourceEstimates(), query.getSession().getQueryType().map(Enum::name), query.getSession().getClientInfo(), query.getSession().getSchema(), query.getSession().getIdentity().getPrincipal().map(Principal::getName));
748+
boolean shouldBypassAdmissionControl = admissionControlPolicyByPassConfig.shouldBypassAdmissionControl(selectionCriteria);
749+
750+
if (!shouldBypassAdmissionControl) {
751+
// Check all ancestors for capacity
752+
InternalResourceGroup group = this;
753+
while (true) {
754+
canQueue &= group.canQueueMore();
755+
canRun &= group.canRunMore();
756+
if (!group.parent.isPresent()) {
757+
break;
758+
}
759+
group = group.parent.get();
725760
}
726-
group = group.parent.get();
727761
}
762+
728763
if (!canQueue && !canRun) {
729764
isQueryQueueFull = true;
730765
}
731766
else {
732767
query.setResourceGroupQueryLimits(perQueryLimits);
733-
if (canRun && queuedQueries.isEmpty()) {
734-
startInBackground(query);
768+
if (shouldBypassAdmissionControl || (canRun && queuedQueries.isEmpty())) {
769+
startInBackground(query, shouldBypassAdmissionControl);
735770
}
736771
else {
737772
enqueueQuery(query);
@@ -770,7 +805,7 @@ private void enqueueQuery(ManagedQueryExecution query)
770805
}
771806

772807
// This method must be called whenever the group's eligibility to run more queries may have changed.
773-
private void updateEligibility()
808+
protected void updateEligibility()
774809
{
775810
checkState(Thread.holdsLock(root), "Must hold lock to update eligibility");
776811
synchronized (root) {
@@ -790,18 +825,20 @@ private void updateEligibility()
790825
}
791826
}
792827

793-
private void startInBackground(ManagedQueryExecution query)
828+
private void startInBackground(ManagedQueryExecution query, boolean shouldBypassAdmissionControl)
794829
{
795830
checkState(Thread.holdsLock(root), "Must hold lock to start a query");
796831
synchronized (root) {
797-
runningQueries.add(query);
798832
InternalResourceGroup group = this;
799-
while (group.parent.isPresent()) {
800-
group.parent.get().descendantRunningQueries++;
801-
group.parent.get().dirtySubGroups.add(group);
802-
group = group.parent.get();
833+
if (!shouldBypassAdmissionControl) {
834+
runningQueries.add(query);
835+
while (group.parent.isPresent()) {
836+
group.parent.get().descendantRunningQueries++;
837+
group.parent.get().dirtySubGroups.add(group);
838+
group = group.parent.get();
839+
}
840+
updateEligibility();
803841
}
804-
updateEligibility();
805842
executor.execute(query::startWaitingForResources);
806843
group = this;
807844
long lastRunningQueryStartTimeMillis = currentTimeMillis();
@@ -905,7 +942,7 @@ protected boolean internalStartNext()
905942

906943
ManagedQueryExecution query = queuedQueries.poll();
907944
if (query != null) {
908-
startInBackground(query);
945+
startInBackground(query, false);
909946
return true;
910947
}
911948

@@ -1019,6 +1056,11 @@ private boolean canRunMore()
10191056
{
10201057
checkState(Thread.holdsLock(root), "Must hold lock");
10211058
synchronized (root) {
1059+
// Check if more queries can be run on the cluster based on cluster overload
1060+
if (clusterResourceChecker.isClusterCurrentlyOverloaded()) {
1061+
return false;
1062+
}
1063+
10221064
if (cpuUsageMillis >= hardCpuLimitMillis) {
10231065
return false;
10241066
}
@@ -1135,7 +1177,8 @@ public RootInternalResourceGroup(
11351177
Executor executor,
11361178
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
11371179
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
1138-
InternalNodeManager nodeManager)
1180+
InternalNodeManager nodeManager,
1181+
ClusterResourceChecker clusterResourceChecker)
11391182
{
11401183
super(Optional.empty(),
11411184
name,
@@ -1144,7 +1187,16 @@ public RootInternalResourceGroup(
11441187
true,
11451188
additionalRuntimeInfo,
11461189
shouldWaitForResourceManagerUpdate,
1147-
nodeManager);
1190+
nodeManager,
1191+
clusterResourceChecker);
1192+
}
1193+
1194+
public synchronized void updateEligibilityRecursively(InternalResourceGroup group)
1195+
{
1196+
group.updateEligibility();
1197+
for (InternalResourceGroup subGroup : group.subGroups()) {
1198+
updateEligibilityRecursively(subGroup);
1199+
}
11481200
}
11491201

11501202
public synchronized void processQueuedQueries()

0 commit comments

Comments
 (0)