diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java index bb19188fc4e5..1f5158827532 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -114,7 +114,7 @@ public void setup() final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs; final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index); final PlannerFactory plannerFactory = new PlannerFactory( druidSchema, diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 358c1ec3f1a3..9a3f03e36ac3 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -698,6 +698,8 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false| |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| +|`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST api will be invoked when broker needs published segments info.|false| +|`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator for published segments list if `druid.sql.planner.metadataSegmentCacheEnable` is set to true. Poll period is in milliseconds. |60000| ## SQL Metrics diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java index 4641e2748997..6b218bbcb44c 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java @@ -192,7 +192,7 @@ public void setUp() throws Exception final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = new DruidOperatorTable( ImmutableSet.of(new BloomFilterSqlAggregator()), ImmutableSet.of() diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index 69337b568f60..7aa4e6733ea2 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -157,7 +157,7 @@ public void setUp() throws Exception final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = new DruidOperatorTable( ImmutableSet.of(new QuantileSqlAggregator()), ImmutableSet.of() diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java new file mode 100644 index 000000000000..11d104d8604f --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import org.apache.druid.timeline.DataSegment; + +/** + * Interns the DataSegment object in order to share the reference for same DataSegment. + * It uses two separate interners for realtime and historical segments to prevent + * overwriting the size of a segment which was served by a historical and later served + * by another realtime server, since realtime server always publishes with size 0. + */ +public class DataSegmentInterner +{ + private static final Interner REALTIME_INTERNER = Interners.newWeakInterner(); + private static final Interner HISTORICAL_INTERNER = Interners.newWeakInterner(); + + private DataSegmentInterner() + { + //No instantiation + } + + public static DataSegment intern(DataSegment segment) + { + // A segment learns it's size and dimensions when it moves from a relatime to historical server + // for that reason, we are using it's size as the indicator to decide whether to use REALTIME or + // HISTORICAL interner. + return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment); + } +} diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index de6b58be3854..a485dbaa955c 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -20,6 +20,7 @@ package org.apache.druid.client.selector; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -50,7 +51,7 @@ public ServerSelector( TierSelectorStrategy strategy ) { - this.segment = new AtomicReference<>(segment); + this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index d1e83a91f4ea..c7e270214ff4 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -148,14 +149,22 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final @GET @Path("/segments") @Produces(MediaType.APPLICATION_JSON) - public Response getDatabaseSegments(@Context final HttpServletRequest req) + public Response getDatabaseSegments( + @Context final HttpServletRequest req, + @QueryParam("datasources") final Set datasources + ) { - final Collection druidDataSources = metadataSegmentManager.getDataSources(); + Collection druidDataSources = metadataSegmentManager.getDataSources(); + if (datasources != null && !datasources.isEmpty()) { + druidDataSources = druidDataSources.stream() + .filter(src -> datasources.contains(src.getName())) + .collect(Collectors.toSet()); + } final Stream metadataSegments = druidDataSources .stream() .flatMap(t -> t.getSegments().stream()); - Function> raGenerator = segment -> Collections.singletonList( + final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); final Iterable authorizedSegments = diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 2eff321d78f0..4f043eeaedef 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2213,13 +2213,13 @@ private List> populateTimeline( expectedResults.get(k).get(j) ); serverExpectations.get(lastServer).addExpectation(expectation); - + EasyMock.expect(mockSegment.getSize()).andReturn(0L).anyTimes(); + EasyMock.replay(mockSegment); ServerSelector selector = new ServerSelector( expectation.getSegment(), new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment()); - final ShardSpec shardSpec; if (numChunks == 1) { shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0); @@ -2234,6 +2234,7 @@ private List> populateTimeline( } shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j); } + EasyMock.reset(mockSegment); EasyMock.expect(mockSegment.getShardSpec()) .andReturn(shardSpec) .anyTimes(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 766bf92e6d41..acc4d08986cd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -67,6 +67,21 @@ public class PlannerConfig private DateTimeZone sqlTimeZone = DateTimeZone.UTC; @JsonProperty + private boolean metadataSegmentCacheEnable = false; + + @JsonProperty + private long metadataSegmentPollPeriod = 60000; + + public long getMetadataSegmentPollPeriod() + { + return metadataSegmentPollPeriod; + } + + public boolean isMetadataSegmentCacheEnable() + { + return metadataSegmentCacheEnable; + } + private boolean serializeComplexValues = true; public Period getMetadataRefreshPeriod() @@ -159,6 +174,8 @@ public PlannerConfig withOverrides(final Map context) newConfig.requireTimeCondition = isRequireTimeCondition(); newConfig.sqlTimeZone = getSqlTimeZone(); newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); + newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable(); + newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod(); newConfig.serializeComplexValues = shouldSerializeComplexValues(); return newConfig; } @@ -200,6 +217,8 @@ public boolean equals(final Object o) useFallback == that.useFallback && requireTimeCondition == that.requireTimeCondition && awaitInitializationOnStart == that.awaitInitializationOnStart && + metadataSegmentCacheEnable == that.metadataSegmentCacheEnable && + metadataSegmentPollPeriod == that.metadataSegmentPollPeriod && serializeComplexValues == that.serializeComplexValues && Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && Objects.equals(sqlTimeZone, that.sqlTimeZone); @@ -221,6 +240,8 @@ public int hashCode() requireTimeCondition, awaitInitializationOnStart, sqlTimeZone, + metadataSegmentCacheEnable, + metadataSegmentPollPeriod, serializeComplexValues ); } @@ -239,6 +260,8 @@ public String toString() ", useFallback=" + useFallback + ", requireTimeCondition=" + requireTimeCondition + ", awaitInitializationOnStart=" + awaitInitializationOnStart + + ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable + + ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + ", sqlTimeZone=" + sqlTimeZone + ", serializeComplexValues=" + serializeComplexValues + '}'; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java new file mode 100644 index 000000000000..50fe3133cd28 --- /dev/null +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite.schema; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.BrokerSegmentWatcherConfig; +import org.apache.druid.client.DataSegmentInterner; +import org.apache.druid.client.JsonParserIterator; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.concurrent.LifecycleLock; +import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class polls the coordinator in background to keep the latest published segments. + * Provides {@link #getPublishedSegments()} for others to get segments in metadata store. + */ +@ManageLifecycle +public class MetadataSegmentView +{ + + private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); + + private final DruidLeaderClient coordinatorDruidLeaderClient; + private final ObjectMapper jsonMapper; + private final BytesAccumulatingResponseHandler responseHandler; + private final BrokerSegmentWatcherConfig segmentWatcherConfig; + + private final boolean isCacheEnabled; + @Nullable + private final ConcurrentMap publishedSegments; + private final ScheduledExecutorService scheduledExec; + private final long pollPeriodInMS; + private final LifecycleLock lifecycleLock = new LifecycleLock(); + private final AtomicBoolean cachePopulated = new AtomicBoolean(false); + + @Inject + public MetadataSegmentView( + final @Coordinator DruidLeaderClient druidLeaderClient, + final ObjectMapper jsonMapper, + final BytesAccumulatingResponseHandler responseHandler, + final BrokerSegmentWatcherConfig segmentWatcherConfig, + final PlannerConfig plannerConfig + ) + { + Preconditions.checkNotNull(plannerConfig, "plannerConfig"); + this.coordinatorDruidLeaderClient = druidLeaderClient; + this.jsonMapper = jsonMapper; + this.responseHandler = responseHandler; + this.segmentWatcherConfig = segmentWatcherConfig; + this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); + this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); + this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null; + this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); + } + + @LifecycleStart + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + if (isCacheEnabled) { + scheduledExec.schedule(new PollTask(), pollPeriodInMS, TimeUnit.MILLISECONDS); + } + lifecycleLock.started(); + log.info("MetadataSegmentView Started."); + } + finally { + lifecycleLock.exitStart(); + } + } + + @LifecycleStop + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + log.info("MetadataSegmentView is stopping."); + if (isCacheEnabled) { + scheduledExec.shutdown(); + } + log.info("MetadataSegmentView Stopped."); + } + + private void poll() + { + log.info("polling published segments from coordinator"); + final JsonParserIterator metadataSegments = getMetadataSegments( + coordinatorDruidLeaderClient, + jsonMapper, + responseHandler, + segmentWatcherConfig.getWatchedDataSources() + ); + + final DateTime timestamp = DateTimes.nowUtc(); + while (metadataSegments.hasNext()) { + final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next()); + // timestamp is used to filter deleted segments + publishedSegments.put(interned, timestamp); + } + // filter the segments from cache whose timestamp is not equal to latest timestamp stored, + // since the presence of a segment with an earlier timestamp indicates that + // "that" segment is not returned by coordinator in latest poll, so it's + // likely deleted and therefore we remove it from publishedSegments + // Since segments are not atomically replaced because it can cause high + // memory footprint due to large number of published segments, so + // we are incrementally removing deleted segments from the map + // This means publishedSegments will be eventually consistent with + // the segments in coordinator + publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); + cachePopulated.set(true); + } + + public Iterator getPublishedSegments() + { + if (isCacheEnabled) { + Preconditions.checkState( + lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), + "hold on, still syncing published segments" + ); + return publishedSegments.keySet().iterator(); + } else { + return getMetadataSegments( + coordinatorDruidLeaderClient, + jsonMapper, + responseHandler, + segmentWatcherConfig.getWatchedDataSources() + ); + } + } + + // Note that coordinator must be up to get segments + private JsonParserIterator getMetadataSegments( + DruidLeaderClient coordinatorClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler, + Set watchedDataSources + ) + { + String query = "/druid/coordinator/v1/metadata/segments"; + if (watchedDataSources != null && !watchedDataSources.isEmpty()) { + log.debug( + "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); + final StringBuilder sb = new StringBuilder(); + for (String ds : watchedDataSources) { + sb.append("datasources=").append(ds).append("&"); + } + sb.setLength(sb.length() - 1); + query = "/druid/coordinator/v1/metadata/segments?" + sb; + } + Request request; + try { + request = coordinatorClient.makeRequest( + HttpMethod.GET, + StringUtils.format(query), + false + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + ListenableFuture future = coordinatorClient.goAsync( + request, + responseHandler + ); + + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + { + }); + return new JsonParserIterator<>( + typeRef, + future, + request.getUrl().toString(), + null, + request.getUrl().getHost(), + jsonMapper, + responseHandler + ); + } + + private class PollTask implements Runnable + { + @Override + public void run() + { + long delayMS = pollPeriodInMS; + try { + final long pollStartTime = System.nanoTime(); + poll(); + final long pollEndTime = System.nanoTime(); + final long pollTimeNS = pollEndTime - pollStartTime; + final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); + delayMS = Math.max(pollPeriodInMS - pollTimeMS, 0); + } + catch (Exception e) { + log.makeAlert(e, "Problem polling Coordinator.").emit(); + } + finally { + if (!Thread.currentThread().isInterrupted()) { + scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); + } + } + } + } + +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 772c62886f4e..d0599f861903 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -149,6 +149,7 @@ public class SystemSchema extends AbstractSchema @Inject public SystemSchema( final DruidSchema druidSchema, + final MetadataSegmentView metadataView, final TimelineServerView serverView, final AuthorizerMapper authorizerMapper, final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, @@ -158,11 +159,10 @@ public SystemSchema( { Preconditions.checkNotNull(serverView, "serverView"); BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); - SegmentsTable segmentsTable = new SegmentsTable( + final SegmentsTable segmentsTable = new SegmentsTable( druidSchema, - coordinatorDruidLeaderClient, + metadataView, jsonMapper, - responseHandler, authorizerMapper ); this.tableMap = ImmutableMap.of( @@ -182,23 +182,20 @@ public Map getTableMap() static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; - private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; + private final MetadataSegmentView metadataView; public SegmentsTable( DruidSchema druidSchemna, - DruidLeaderClient druidLeaderClient, + MetadataSegmentView metadataView, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidSchema = druidSchemna; - this.druidLeaderClient = druidLeaderClient; + this.metadataView = metadataView; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -231,12 +228,8 @@ public Enumerable scan(DataContext root) partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } - //get published segments from coordinator - final JsonParserIterator metadataSegments = getMetadataSegments( - druidLeaderClient, - jsonMapper, - responseHandler - ); + //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator + final Iterator metadataSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -245,7 +238,7 @@ public Enumerable scan(DataContext root) metadataSegments, root )) - .transform((DataSegment val) -> { + .transform(val -> { try { segmentsAlreadySeen.add(val.getId()); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId()); @@ -318,6 +311,26 @@ public Enumerable scan(DataContext root) } + private Iterator getAuthorizedPublishedSegments( + Iterator it, + DataContext root + ) + { + final AuthenticationResult authenticationResult = + (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + + Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + authenticationResult, + () -> it, + raGenerator, + authorizerMapper + ); + return authorizedSegments.iterator(); + } + private Iterator> getAuthorizedAvailableSegments( Iterator> availableSegmentEntries, DataContext root @@ -340,27 +353,6 @@ private Iterator> getAuthorizedAvailab return authorizedSegments.iterator(); } - private CloseableIterator getAuthorizedPublishedSegments( - JsonParserIterator it, - DataContext root - ) - { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - - Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); - - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( - authenticationResult, - () -> it, - raGenerator, - authorizerMapper - ); - - return wrap(authorizedSegments.iterator(), it); - } - private static class PartialSegmentData { private final long isAvailable; @@ -404,44 +396,6 @@ public long getNumRows() } } - // Note that coordinator must be up to get segments - private static JsonParserIterator getMetadataSegments( - DruidLeaderClient coordinatorClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler - ) - { - - Request request; - try { - request = coordinatorClient.makeRequest( - HttpMethod.GET, - StringUtils.format("/druid/coordinator/v1/metadata/segments"), - false - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - ListenableFuture future = coordinatorClient.goAsync( - request, - responseHandler - ); - - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - future, - request.getUrl().toString(), - null, - request.getUrl().getHost(), - jsonMapper, - responseHandler - ); - } - static class ServersTable extends AbstractTable implements ScannableTable { private final TimelineServerView serverView; diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index c1c43b5ad1ea..4f25a66c1139 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -159,7 +159,7 @@ public void setUp() throws Exception walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); @@ -790,7 +790,7 @@ public int getMaxRowsPerFrame() final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final List frames = new ArrayList<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java index 74386125ef1f..9cc2ecf4cabb 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java @@ -88,7 +88,7 @@ public void setUp() throws Exception walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final PlannerFactory plannerFactory = new PlannerFactory( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 6b47b42993b6..18c1ac478c76 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -547,7 +547,7 @@ public List getResults( { final InProcessViewManager viewManager = new InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final PlannerFactory plannerFactory = new PlannerFactory( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java index 14b226896d2e..d525fffde4c1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java @@ -121,7 +121,7 @@ public boolean shouldSerializeComplexValues() } }; final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); req = EasyMock.createStrictMock(HttpServletRequest.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 407ff69ed955..7d8cdaad5729 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -98,6 +98,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class SystemSchemaTest extends CalciteTestBase { @@ -127,6 +130,7 @@ public class SystemSchemaTest extends CalciteTestBase private AuthorizerMapper authMapper; private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private MetadataSegmentView metadataView; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -215,8 +219,10 @@ public Authorizer getAuthorizer(String name) ); druidSchema.start(); druidSchema.awaitInitialization(); + metadataView = EasyMock.createMock(MetadataSegmentView.class); schema = new SystemSchema( druidSchema, + metadataView, serverView, EasyMock.createStrictMock(AuthorizerMapper.class), client, @@ -225,6 +231,44 @@ public Authorizer getAuthorizer(String name) ); } + + private final DataSegment publishedSegment1 = new DataSegment( + "wikipedia1", + Intervals.of("2007/2008"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 53000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment publishedSegment2 = new DataSegment( + "wikipedia2", + Intervals.of("2008/2009"), + "version2", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 83000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment publishedSegment3 = new DataSegment( + "wikipedia3", + Intervals.of("2009/2010"), + "version3", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 47000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment segment1 = new DataSegment( "test1", Intervals.of("2010/2011"), @@ -263,7 +307,7 @@ public Authorizer getAuthorizer(String name) ); private final DataSegment segment4 = new DataSegment( "test4", - Intervals.of("2017/2018"), + Intervals.of("2014/2015"), "version4", null, ImmutableList.of("dim1", "dim2"), @@ -275,7 +319,7 @@ public Authorizer getAuthorizer(String name) ); private final DataSegment segment5 = new DataSegment( "test5", - Intervals.of("2017/2018"), + Intervals.of("2015/2016"), "version5", null, ImmutableList.of("dim1", "dim2"), @@ -340,120 +384,22 @@ public void testGetTableMap() } @Test - public void testSegmentsTable() throws Exception + public void testSegmentsTable() { final SystemSchema.SegmentsTable segmentsTable = EasyMock .createMockBuilder(SystemSchema.SegmentsTable.class) - .withConstructor(druidSchema, client, mapper, responseHandler, authMapper) + .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - - EasyMock - .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false)) - .andReturn(request) - .anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); - - EasyMock - .expect(request.getUrl()) - .andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")) - .anyTimes(); - - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2 - final String json = "[{\n" - + "\t\"dataSource\": \"wikipedia1\",\n" - + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/51/1578eb79-0e44-4b41-a87b-65e40c52be53/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 51,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 47406,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n" - + "}, {\n" - + "\t\"dataSource\": \"wikipedia2\",\n" - + "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z/2018-08-07T18:00:00.117Z/9/a2646827-b782-424c-9eed-e48aa448d2c5/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 9,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 83846,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n" - + "}, {\n" - + "\t\"dataSource\": \"wikipedia3\",\n" - + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/50/87c5457e-c39b-4c03-9df8-e2b20b210dfc/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 50,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 53527,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n" - + "}, {\n" - + "\t\"dataSource\": \"test1\",\n" - + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n" - + "\t\"version\": \"version1\",\n" - + "\t\"loadSpec\": null,\n" - + "\t\"dimensions\": \"dim1,dim2\",\n" - + "\t\"metrics\": \"met1,met2\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"domainDimensions\": []\n" - + "\t},\n" - + "\t\"binaryVersion\": 1,\n" - + "\t\"size\": 100,\n" - + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n" - + "}, {\n" - + "\t\"dataSource\": \"test2\",\n" - + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n" - + "\t\"version\": \"version2\",\n" - + "\t\"loadSpec\": null,\n" - + "\t\"dimensions\": \"dim1,dim2\",\n" - + "\t\"metrics\": \"met1,met2\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"domainDimensions\": []\n" - + "\t},\n" - + "\t\"binaryVersion\": 1,\n" - + "\t\"size\": 100,\n" - + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n" - + "}]"; - byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); - - EasyMock.replay(client, request, responseHolder, responseHandler); + final Set publishedSegments = Stream.of(publishedSegment1, + publishedSegment2, + publishedSegment3, + segment1, + segment2).collect(Collectors.toSet()); + EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); + + EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); DataContext dataContext = new DataContext() { @Override @@ -531,7 +477,7 @@ public Object get(String name) verifyRow( rows.get(3), - "test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", + "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", 100L, 0L, //partition_num 1L, //num_replicas @@ -543,7 +489,7 @@ public Object get(String name) verifyRow( rows.get(4), - "test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", + "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", 100L, 0L, //partition_num 1L, //num_replicas @@ -556,8 +502,8 @@ public Object get(String name) // wikipedia segments are published and unavailable, num_replicas is 0 verifyRow( rows.get(5), - "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", - 47406L, + "wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", + 53000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -568,8 +514,8 @@ public Object get(String name) verifyRow( rows.get(6), - "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z", - 83846L, + "wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2", + 83000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -580,8 +526,8 @@ public Object get(String name) verifyRow( rows.get(7), - "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", - 53527L, + "wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3", + 47000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -736,11 +682,11 @@ public Object get(String name) Object[] row3 = rows.get(3); Assert.assertEquals("server2:1234", row3[0]); - Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1].toString()); + Assert.assertEquals("test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", row3[1].toString()); Object[] row4 = rows.get(4); Assert.assertEquals("server2:1234", row4[0]); - Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1].toString()); + Assert.assertEquals("test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", row4[1].toString()); // Verify value types. verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 359ca25bd066..5beafb7bf23c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -31,6 +31,7 @@ import com.google.inject.Key; import com.google.inject.Module; import org.apache.curator.x.discovery.ServiceProvider; +import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.data.input.InputRow; @@ -104,6 +105,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -123,6 +125,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.MetadataSegmentView; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.ViewManager; @@ -739,7 +742,8 @@ public static InputRow createRow(final Object t, final String dim1, final String public static SystemSchema createMockSystemSchema( final DruidSchema druidSchema, - final SpecificSegmentsQuerySegmentWalker walker + final SpecificSegmentsQuerySegmentWalker walker, + final PlannerConfig plannerConfig ) { final DruidLeaderClient druidLeaderClient = new DruidLeaderClient( @@ -753,6 +757,13 @@ public static SystemSchema createMockSystemSchema( }; final SystemSchema schema = new SystemSchema( druidSchema, + new MetadataSegmentView( + druidLeaderClient, + getJsonMapper(), + new BytesAccumulatingResponseHandler(), + new BrokerSegmentWatcherConfig(), + plannerConfig + ), new TestServerInventoryView(walker.getSegments()), TEST_AUTHORIZER_MAPPER, druidLeaderClient,