diff --git a/presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java b/presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java index 0049ba2a3d614..dd04dc1fb9ea3 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java @@ -412,6 +412,11 @@ public Optional getDispatchInfo(QueryId queryId) }); } + public long getDurationUntilExpirationInMillis(QueryId queryId) + { + return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis(); + } + /** * Check if a given queryId exists in query tracker * diff --git a/presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java b/presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java index 2e33580bc483e..14bd6b143709f 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/dispatcher/NoOpQueryManager.java @@ -75,6 +75,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId) throw new UnsupportedOperationException(); } + @Override + public long getDurationUntilExpirationInMillis(QueryId queryId) + { + throw new UnsupportedOperationException(); + } + @Override public Session getQuerySession(QueryId queryId) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java index f5bbec8b358ae..b467275fd1d4a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManager.java @@ -65,6 +65,12 @@ BasicQueryInfo getQueryInfo(QueryId queryId) QueryInfo getFullQueryInfo(QueryId queryId) throws NoSuchElementException; + /** + * @throws NoSuchElementException if query does not exist + */ + long getDurationUntilExpirationInMillis(QueryId queryId) + throws NoSuchElementException; + /** * @throws NoSuchElementException if query does not exist */ diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java index d6290b664cf2a..dac7cdc5409d6 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -405,6 +405,13 @@ public interface TrackedQuery long getEndTimeInMillis(); + default long getDurationUntilExpirationInMillis() + { + Duration queryClientTimeout = getQueryClientTimeout(getSession()); + long expireTime = getLastHeartbeatInMillis() + queryClientTimeout.toMillis(); + return Math.max(0, expireTime - currentTimeMillis()); + } + Optional getResourceGroupQueryLimits(); void fail(Throwable cause); diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java b/presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java index e9de9c13b2727..afd9df076f520 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java +++ b/presto-main-base/src/main/java/com/facebook/presto/server/protocol/ExecutingQueryResponseProvider.java @@ -60,5 +60,6 @@ Optional> waitForExecutingResponse( DataSize targetResultSize, boolean compressionEnabled, boolean nestedDataSerializationEnabled, - boolean binaryResults); + boolean binaryResults, + long durationUntilExpirationMs); } diff --git a/presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java b/presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java index adb47d813e8ef..22b28b43c1e40 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java +++ b/presto-main-base/src/main/java/com/facebook/presto/server/protocol/QueryResourceUtil.java @@ -39,6 +39,7 @@ import io.airlift.units.Duration; import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.CacheControl; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; @@ -100,7 +101,7 @@ public final class QueryResourceUtil private QueryResourceUtil() {} - public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled) + public static Response toResponse(Query query, QueryResults queryResults, boolean compressionEnabled, long durationUntilExpirationMs) { Response.ResponseBuilder response = Response.ok(queryResults); @@ -158,10 +159,18 @@ public static Response toResponse(Query query, QueryResults queryResults, boolea response.header(PRESTO_REMOVED_SESSION_FUNCTION, urlEncode(SQL_FUNCTION_ID_JSON_CODEC.toJson(signature))); } + response.cacheControl(getCacheControlMaxAge(durationUntilExpirationMs)); + return response.build(); } - public static Response toResponse(Query query, QueryResults queryResults, String xPrestoPrefixUri, boolean compressionEnabled, boolean nestedDataSerializationEnabled) + public static Response toResponse( + Query query, + QueryResults queryResults, + String xPrestoPrefixUri, + boolean compressionEnabled, + boolean nestedDataSerializationEnabled, + long durationUntilExpirationMs) { Iterable> queryResultsData = queryResults.getData(); if (nestedDataSerializationEnabled) { @@ -181,7 +190,12 @@ public static Response toResponse(Query query, QueryResults queryResults, String queryResults.getUpdateType(), queryResults.getUpdateCount()); - return toResponse(query, resultsClone, compressionEnabled); + return toResponse(query, resultsClone, compressionEnabled, durationUntilExpirationMs); + } + + public static CacheControl getCacheControlMaxAge(long durationUntilExpirationMs) + { + return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(durationUntilExpirationMs)); } public static void abortIfPrefixUrlInvalid(String xPrestoPrefixUrl) diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java index c852d70c9cb8e..dbebdae2a81ba 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManager.java @@ -53,6 +53,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId) return null; } + @Override + public long getDurationUntilExpirationInMillis(QueryId queryId) + { + return 0; + } + public Session getQuerySession(QueryId queryId) { return null; diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java index e0f089adde85c..0ab7776c981ed 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java @@ -252,6 +252,13 @@ public QueryInfo getFullQueryInfo(QueryId queryId) return queryTracker.getQuery(queryId).getQueryInfo(); } + @Override + public long getDurationUntilExpirationInMillis(QueryId queryId) + throws NoSuchElementException + { + return queryTracker.getQuery(queryId).getDurationUntilExpirationInMillis(); + } + @Override public Session getQuerySession(QueryId queryId) throws NoSuchElementException diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java index 950769a1a7904..6b286492ffa0d 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/ExecutingStatementResource.java @@ -16,6 +16,7 @@ import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.stats.TimeStat; import com.facebook.presto.client.QueryResults; +import com.facebook.presto.execution.QueryManager; import com.facebook.presto.server.ForStatementResource; import com.facebook.presto.server.ServerConfig; import com.facebook.presto.spi.QueryId; @@ -68,6 +69,7 @@ public class ExecutingStatementResource private final BoundedExecutor responseExecutor; private final LocalQueryProvider queryProvider; + private final QueryManager queryManager; private final boolean compressionEnabled; private final boolean nestedDataSerializationEnabled; private final QueryBlockingRateLimiter queryRateLimiter; @@ -76,11 +78,13 @@ public class ExecutingStatementResource public ExecutingStatementResource( @ForStatementResource BoundedExecutor responseExecutor, LocalQueryProvider queryProvider, + QueryManager queryManager, ServerConfig serverConfig, QueryBlockingRateLimiter queryRateLimiter) { this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null"); this.queryProvider = requireNonNull(queryProvider, "queryProvider is null"); + this.queryManager = requireNonNull(queryManager, "queryManager is null"); this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled(); this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled(); this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null"); @@ -132,9 +136,10 @@ public void getQueryResults( return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize, binaryResults); }, responseExecutor); + long durationUntilExpirationMs = queryManager.getDurationUntilExpirationInMillis(queryId); ListenableFuture queryResultsFuture = transform( waitForResultsAsync, - results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled), + results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs), directExecutor()); bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java index e0d7dc0c8d855..f57c29a223fb2 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/LocalExecutingQueryResponseProvider.java @@ -26,6 +26,7 @@ import java.util.Optional; +import static com.facebook.presto.server.protocol.QueryResourceUtil.toResponse; import static com.google.common.util.concurrent.Futures.transform; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; @@ -53,7 +54,8 @@ public Optional> waitForExecutingResponse( DataSize targetResultSize, boolean compressionEnabled, boolean nestedDataSerializationEnabled, - boolean binaryResults) + boolean binaryResults, + long durationUntilExpirationMs) { Query query; try { @@ -64,7 +66,7 @@ public Optional> waitForExecutingResponse( } return Optional.of(transform( query.waitForResults(0, uriInfo, scheme, maxWait, targetResultSize, binaryResults), - results -> QueryResourceUtil.toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled), + results -> toResponse(query, results, xPrestoPrefixUrl, compressionEnabled, nestedDataSerializationEnabled, durationUntilExpirationMs), directExecutor())); } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java index 82b4b55cbb459..d9cae4af1e380 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/protocol/QueuedStatementResource.java @@ -56,6 +56,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.CacheControl; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; @@ -77,6 +78,7 @@ import static com.facebook.presto.server.protocol.QueryResourceUtil.NO_DURATION; import static com.facebook.presto.server.protocol.QueryResourceUtil.abortIfPrefixUrlInvalid; import static com.facebook.presto.server.protocol.QueryResourceUtil.createQueuedQueryResults; +import static com.facebook.presto.server.protocol.QueryResourceUtil.getCacheControlMaxAge; import static com.facebook.presto.server.protocol.QueryResourceUtil.getQueuedUri; import static com.facebook.presto.server.protocol.QueryResourceUtil.getScheme; import static com.facebook.presto.server.security.RoleType.USER; @@ -91,6 +93,7 @@ import static com.google.common.util.concurrent.Futures.transformAsync; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.lang.System.currentTimeMillis; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -221,7 +224,9 @@ public Response postStatement( Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0); queries.put(query.getQueryId(), query); - return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled) + .cacheControl(query.getDefaultCacheControl()) + .build(); } /** @@ -273,7 +278,9 @@ public Response putStatement( throw badRequest(CONFLICT, "Query already exists"); } - return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled) + .cacheControl(query.getDefaultCacheControl()) + .build(); } /** @@ -322,7 +329,9 @@ public Response retryFailedQuery( } } - return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build(); + return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled) + .cacheControl(query.getDefaultCacheControl()) + .build(); } /** @@ -435,6 +444,7 @@ private static Response.ResponseBuilder withCompressionConfiguration(Response.Re private static final class Query { + private static final int CACHE_CONTROL_MAX_AGE_SEC = 60; private final String query; private final SessionContext sessionContext; private final DispatchManager dispatchManager; @@ -443,6 +453,7 @@ private static final class Query private final String slug; private final AtomicLong lastToken = new AtomicLong(); private final int retryCount; + private final long expirationTime; @GuardedBy("this") private ListenableFuture querySubmissionFuture; @@ -468,6 +479,7 @@ public Query( this.retryCount = retryCount; this.queryId = requireNonNull(queryId, "queryId is null"); this.slug = requireNonNull(slug, "slug is null"); + this.expirationTime = currentTimeMillis() + SECONDS.toMillis(CACHE_CONTROL_MAX_AGE_SEC); } /** @@ -518,6 +530,15 @@ public int getRetryCount() return retryCount; } + /** + * Returns a cache control with the default max age value + */ + public CacheControl getDefaultCacheControl() + { + long maxAgeMillis = Math.max(0, expirationTime - currentTimeMillis()); + return CacheControl.valueOf("max-age=" + MILLISECONDS.toSeconds(maxAgeMillis)); + } + /** * Checks whether the query has been processed by the dispatchManager */ @@ -591,7 +612,10 @@ public ListenableFuture toResponse( xPrestoPrefixUrl, DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION), binaryResults); - return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled).build()); + + return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled) + .cacheControl(getDefaultCacheControl()) + .build()); } } @@ -602,6 +626,7 @@ public ListenableFuture toResponse( .status(NOT_FOUND) .build())); } + long durationUntilExpirationMs = dispatchManager.getDurationUntilExpirationInMillis(queryId); if (waitForDispatched().isDone()) { Optional> executingQueryResponse = executingQueryResponseProvider.waitForExecutingResponse( @@ -615,7 +640,8 @@ public ListenableFuture toResponse( TARGET_RESULT_SIZE, compressionEnabled, nestedDataSerializationEnabled, - binaryResults); + binaryResults, + durationUntilExpirationMs); if (executingQueryResponse.isPresent()) { return executingQueryResponse.get(); @@ -624,6 +650,7 @@ public ListenableFuture toResponse( return immediateFuture(withCompressionConfiguration(Response.ok( createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled) + .cacheControl(getCacheControlMaxAge(durationUntilExpirationMs)) .build()); } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java index c2ccda8fd46b5..45b913f62a199 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestServer.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestServer.java @@ -79,14 +79,17 @@ import static com.facebook.presto.server.TestHttpRequestSessionContext.urlEncode; import static com.facebook.presto.spi.StandardErrorCode.INCOMPATIBLE_CLIENT; import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage; +import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; +import static javax.ws.rs.core.HttpHeaders.CACHE_CONTROL; import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.Response.Status.OK; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @Test(singleThreaded = true) @@ -425,6 +428,38 @@ public void testStatusPing() assertEquals(response.getHeader(CONTENT_TYPE), APPLICATION_JSON, "Content Type"); } + @Test + public void testCacheControlHeaderExists() + { + Request request = preparePost() + .setUri(uriFor("/v1/statement")) + .setBodyGenerator(createStaticBodyGenerator("show catalogs", UTF_8)) + .setHeader(PRESTO_USER, "user") + .build(); + + JsonResponse initResponse = client.execute(request, createFullJsonResponseHandler(QUERY_RESULTS_CODEC)); + + String initHeader = initResponse.getHeader(CACHE_CONTROL); + assertNotNull(initHeader); + assertTrue(initHeader.contains("max-age")); + + int initAge = parseInt(initHeader.substring(initHeader.indexOf("=") + 1)); + assertTrue(initAge >= 0); + + JsonResponse queryResults = initResponse; + while (queryResults.getValue().getNextUri() != null) { + URI nextUri = queryResults.getValue().getNextUri(); + queryResults = client.execute(prepareGet().setUri(nextUri).build(), createFullJsonResponseHandler(QUERY_RESULTS_CODEC)); + + String header = queryResults.getHeader(CACHE_CONTROL); + assertNotNull(header); + assertTrue(header.contains("max-age")); + + int maxAge = parseInt(header.substring(header.indexOf("=") + 1)); + assertTrue(maxAge >= 0); + } + } + public URI uriFor(String path) { return HttpUriBuilder.uriBuilderFrom(server.getBaseUrl()).replacePath(path).build(); diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkQueryManager.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkQueryManager.java index a41192030f414..741ad8f39edc3 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkQueryManager.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkQueryManager.java @@ -72,6 +72,12 @@ public QueryInfo getFullQueryInfo(QueryId queryId) throw new UnsupportedOperationException(); } + @Override + public long getDurationUntilExpirationInMillis(QueryId queryId) + { + throw new UnsupportedOperationException(); + } + @Override public Session getQuerySession(QueryId queryId) {