Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,14 @@ The corresponding configuration property is :ref:`admin/properties:\`\`query.cli
* **Default value:** ``1``

This property defines the priority of queries for execution and plays an important role in query admission.
Queries with higher priority are scheduled first than the ones with lower priority. Higher number indicates higher priority.
Queries with higher priority are scheduled first than the ones with lower priority. Higher number indicates higher priority.

``query_max_queued_time``
^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``Duration``
* **Default value:** ``100d``

Use to configure how long a query can be queued before it is terminated.

The corresponding configuration property is :ref:`admin/properties:\`\`query.max-queued-time\`\``.
12 changes: 11 additions & 1 deletion presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1136,4 +1136,14 @@ Query Manager Properties
This property can be used to configure how long a query runs without contact
from the client application, such as the CLI, before it's abandoned.

The corresponding session property is :ref:`admin/properties-session:\`\`query_client_timeout\`\``.
The corresponding session property is :ref:`admin/properties-session:\`\`query_client_timeout\`\``.

``query.max-queued-time``
^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``Duration``
* **Default value:** ``100d``

Use to configure how long a query can be queued before it is terminated.

The corresponding session property is :ref:`admin/properties-session:\`\`query_max_queued_time\`\``.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_BROADCAST_MEMORY = "query_max_broadcast_memory";
public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory";
public static final String QUERY_MAX_TOTAL_MEMORY_PER_NODE = "query_max_total_memory_per_node";
public static final String QUERY_MAX_QUEUED_TIME = "query_max_queued_time";
public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time";
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
Expand Down Expand Up @@ -569,6 +570,15 @@ public SystemSessionProperties(
false,
value -> Duration.valueOf((String) value),
Duration::toString),
new PropertyMetadata<>(
QUERY_MAX_QUEUED_TIME,
"Maximum Queued time of a query",
VARCHAR,
Duration.class,
queryManagerConfig.getQueryMaxQueuedTime(),
false,
value -> Duration.valueOf((String) value),
Duration::toString),
new PropertyMetadata<>(
QUERY_MAX_EXECUTION_TIME,
"Maximum execution time of a query",
Expand Down Expand Up @@ -2188,6 +2198,11 @@ public static Duration getQueryMaxRunTime(Session session)
return session.getSystemProperty(QUERY_MAX_RUN_TIME, Duration.class);
}

public static Duration getQueryMaxQueuedTime(Session session)
{
return session.getSystemProperty(QUERY_MAX_QUEUED_TIME, Duration.class);
}

public static Duration getQueryMaxExecutionTime(Session session)
{
return session.getSystemProperty(QUERY_MAX_EXECUTION_TIME, Duration.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public long getCreateTimeInMillis()
return basicQueryInfo.getQueryStats().getCreateTimeInMillis();
}

@Override
public Duration getQueuedTime()
{
return basicQueryInfo.getQueryStats().getQueuedTime();
}

@Override
public long getExecutionStartTimeInMillis()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ public long getCreateTimeInMillis()
return stateMachine.getCreateTimeInMillis();
}

@Override
public Duration getQueuedTime()
{
return stateMachine.getQueuedTime();
}

@Override
public long getExecutionStartTimeInMillis()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public long getCreateTimeInMillis()
return stateMachine.getCreateTimeInMillis();
}

@Override
public Duration getQueuedTime()
{
return stateMachine.getQueuedTime();
}

@Override
public long getExecutionStartTimeInMillis()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class QueryManagerConfig

private String queryExecutionPolicy = "all-at-once";
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxQueuedTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);

Expand Down Expand Up @@ -431,6 +432,19 @@ public QueryManagerConfig setQueryMaxRunTime(Duration queryMaxRunTime)
return this;
}

@NotNull
public Duration getQueryMaxQueuedTime()
{
return queryMaxQueuedTime;
}

@Config("query.max-queued-time")
public QueryManagerConfig setQueryMaxQueuedTime(Duration queryMaxQueuedTime)
{
this.queryMaxQueuedTime = queryMaxQueuedTime;
return this;
}

@NotNull
public Duration getQueryMaxExecutionTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -1025,6 +1026,11 @@ public long getCreateTimeInMillis()
return queryStateTimer.getCreateTimeInMillis();
}

public Duration getQueuedTime()
{
return queryStateTimer.getQueuedTime();
}

public long getExecutionStartTimeInMillis()
{
return queryStateTimer.getExecutionStartTimeInMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static com.facebook.presto.SystemSessionProperties.getQueryClientTimeout;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxQueuedTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime;
import static com.facebook.presto.execution.QueryLimit.Source.QUERY;
import static com.facebook.presto.execution.QueryLimit.Source.RESOURCE_GROUP;
Expand Down Expand Up @@ -211,22 +212,28 @@ public long getQueriesKilledDueToTooManyTask()
}

/**
* Enforce query max runtime/execution time limits
* Enforce query max runtime/queued/execution time limits
*/
private void enforceTimeLimits()
@VisibleForTesting
void enforceTimeLimits()
{
for (T query : queries.values()) {
if (query.isDone()) {
continue;
}
Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession());
Duration queryMaxQueuedTime = getQueryMaxQueuedTime(query.getSession());
QueryLimit<Duration> queryMaxExecutionTime = getMinimum(
createDurationLimit(getQueryMaxExecutionTime(query.getSession()), QUERY),
query.getResourceGroupQueryLimits()
.flatMap(ResourceGroupQueryLimits::getExecutionTimeLimit)
.map(rgLimit -> createDurationLimit(rgLimit, RESOURCE_GROUP)).orElse(null));
long executionStartTime = query.getExecutionStartTimeInMillis();
long createTimeInMillis = query.getCreateTimeInMillis();
long queuedTimeInMillis = query.getQueuedTime().toMillis();
if (queuedTimeInMillis > queryMaxQueuedTime.toMillis()) {
query.fail(new PrestoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum queued time limit of " + queryMaxQueuedTime));
}
if (executionStartTime > 0 && (executionStartTime + queryMaxExecutionTime.getLimit().toMillis()) < currentTimeMillis()) {
query.fail(
new PrestoException(EXCEEDED_TIME_LIMIT,
Expand Down Expand Up @@ -399,6 +406,8 @@ public interface TrackedQuery

long getCreateTimeInMillis();

Duration getQueuedTime();

long getExecutionStartTimeInMillis();

long getLastHeartbeatInMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@ public long getCreateTimeInMillis()
return stateMachine.getCreateTimeInMillis();
}

@Override
public Duration getQueuedTime()
{
return stateMachine.getQueuedTime();
}

/**
* For a query that has started executing, returns the timestamp when this query started executing
* Otherwise returns a {@link Optional#empty()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static io.airlift.units.Duration.succinctDuration;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class MockQueryExecution
implements QueryExecution
{
Expand Down Expand Up @@ -179,6 +182,12 @@ public long getCreateTimeInMillis()
return 0L;
}

@Override
public Duration getQueuedTime()
{
return succinctDuration(0, MILLISECONDS);
}

@Override
public long getExecutionStartTimeInMillis()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void testDefaults()
.setRemoteTaskMaxCallbackThreads(Runtime.getRuntime().availableProcessors())
.setQueryExecutionPolicy("all-at-once")
.setQueryMaxRunTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxQueuedTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxExecutionTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS))
.setQueryMaxScanRawInputBytes(new DataSize(1000, PETABYTE))
Expand Down Expand Up @@ -115,6 +116,7 @@ public void testExplicitPropertyMappings()
.put("query.remote-task.max-callback-threads", "11")
.put("query.execution-policy", "phased")
.put("query.max-run-time", "2h")
.put("query.max-queued-time", "1h")
.put("query.max-execution-time", "3h")
.put("query.max-cpu-time", "2d")
.put("query.max-scan-raw-input-bytes", "1MB")
Expand Down Expand Up @@ -167,6 +169,7 @@ public void testExplicitPropertyMappings()
.setRemoteTaskMaxCallbackThreads(11)
.setQueryExecutionPolicy("phased")
.setQueryMaxRunTime(new Duration(2, TimeUnit.HOURS))
.setQueryMaxQueuedTime(new Duration(1, TimeUnit.HOURS))
.setQueryMaxExecutionTime(new Duration(3, TimeUnit.HOURS))
.setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS))
.setQueryMaxScanRawInputBytes(new DataSize(1, MEGABYTE))
Expand Down
Loading
Loading