-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Introduce published segment cache in broker #6901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e8b0a23
6ec4911
8b6d453
281600d
cdc751e
8fed80a
0d0f89f
8fc84b3
adf133f
d46edc5
92601d0
a993482
a4cbcfc
e376df3
0fbb48c
8df2d96
d5b7d79
2bfa396
632d741
bcc6513
c41f085
07a80b1
d83eb33
a440c0c
b385345
981b080
3a94cae
0032a31
ad28458
33751a4
2ddb7a1
1b37493
60dbf41
8860053
ca21779
3a70f39
e2a9af7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.client; | ||
|
|
||
| import com.google.common.collect.Interner; | ||
| import com.google.common.collect.Interners; | ||
| import org.apache.druid.timeline.DataSegment; | ||
|
|
||
| /** | ||
| * Interns the DataSegment object in order to share the reference for same DataSegment. | ||
| * It uses two separate interners for realtime and historical segments to prevent | ||
| * overwriting the size of a segment which was served by a historical and later served | ||
| * by another realtime server, since realtime server always publishes with size 0. | ||
| */ | ||
| public class DataSegmentInterner | ||
| { | ||
| private static final Interner<DataSegment> REALTIME_INTERNER = Interners.newWeakInterner(); | ||
| private static final Interner<DataSegment> HISTORICAL_INTERNER = Interners.newWeakInterner(); | ||
|
|
||
| private DataSegmentInterner() | ||
| { | ||
| //No instantiation | ||
| } | ||
|
|
||
| public static DataSegment intern(DataSegment segment) | ||
| { | ||
| // A segment learns it's size and dimensions when it moves from a relatime to historical server | ||
| // for that reason, we are using it's size as the indicator to decide whether to use REALTIME or | ||
| // HISTORICAL interner. | ||
| return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's worth to comment why
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a comment |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| package org.apache.druid.client.selector; | ||
|
|
||
| import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; | ||
| import org.apache.druid.client.DataSegmentInterner; | ||
| import org.apache.druid.server.coordination.DruidServerMetadata; | ||
| import org.apache.druid.server.coordination.ServerType; | ||
| import org.apache.druid.timeline.DataSegment; | ||
|
|
@@ -50,7 +51,7 @@ public ServerSelector( | |
| TierSelectorStrategy strategy | ||
| ) | ||
| { | ||
| this.segment = new AtomicReference<>(segment); | ||
| this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay |
||
| this.strategy = strategy; | ||
| this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); | ||
| this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,7 @@ | |
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.TreeSet; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| /** | ||
|
|
@@ -148,14 +149,22 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final | |
| @GET | ||
| @Path("/segments") | ||
| @Produces(MediaType.APPLICATION_JSON) | ||
| public Response getDatabaseSegments(@Context final HttpServletRequest req) | ||
| public Response getDatabaseSegments( | ||
| @Context final HttpServletRequest req, | ||
| @QueryParam("datasources") final Set<String> datasources | ||
| ) | ||
| { | ||
| final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources(); | ||
| Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources(); | ||
| if (datasources != null && !datasources.isEmpty()) { | ||
| druidDataSources = druidDataSources.stream() | ||
| .filter(src -> datasources.contains(src.getName())) | ||
| .collect(Collectors.toSet()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @surekhasaharan we don't want to collect ImmutableDruidDataSource objects, which are already known to be unique here, into a set. It induces expensive O(n) hash code computations (which is also totally unnecessary;
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks @leventov for catching this, will change to list in a follow up PR. |
||
| } | ||
| final Stream<DataSegment> metadataSegments = druidDataSources | ||
| .stream() | ||
| .flatMap(t -> t.getSegments().stream()); | ||
|
|
||
| Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList( | ||
| final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList( | ||
| AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); | ||
|
|
||
| final Iterable<DataSegment> authorizedSegments = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,21 @@ public class PlannerConfig | |
| private DateTimeZone sqlTimeZone = DateTimeZone.UTC; | ||
|
|
||
| @JsonProperty | ||
| private boolean metadataSegmentCacheEnable = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you please add a doc for this configuration?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, added |
||
|
|
||
| @JsonProperty | ||
| private long metadataSegmentPollPeriod = 60000; | ||
|
|
||
| public long getMetadataSegmentPollPeriod() | ||
| { | ||
| return metadataSegmentPollPeriod; | ||
| } | ||
|
|
||
| public boolean isMetadataSegmentCacheEnable() | ||
| { | ||
| return metadataSegmentCacheEnable; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add to
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| } | ||
|
|
||
| private boolean serializeComplexValues = true; | ||
|
|
||
| public Period getMetadataRefreshPeriod() | ||
|
|
@@ -159,6 +174,8 @@ public PlannerConfig withOverrides(final Map<String, Object> context) | |
| newConfig.requireTimeCondition = isRequireTimeCondition(); | ||
| newConfig.sqlTimeZone = getSqlTimeZone(); | ||
| newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); | ||
| newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable(); | ||
| newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod(); | ||
| newConfig.serializeComplexValues = shouldSerializeComplexValues(); | ||
| return newConfig; | ||
| } | ||
|
|
@@ -200,6 +217,8 @@ public boolean equals(final Object o) | |
| useFallback == that.useFallback && | ||
| requireTimeCondition == that.requireTimeCondition && | ||
| awaitInitializationOnStart == that.awaitInitializationOnStart && | ||
| metadataSegmentCacheEnable == that.metadataSegmentCacheEnable && | ||
| metadataSegmentPollPeriod == that.metadataSegmentPollPeriod && | ||
| serializeComplexValues == that.serializeComplexValues && | ||
| Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && | ||
| Objects.equals(sqlTimeZone, that.sqlTimeZone); | ||
|
|
@@ -221,6 +240,8 @@ public int hashCode() | |
| requireTimeCondition, | ||
| awaitInitializationOnStart, | ||
| sqlTimeZone, | ||
| metadataSegmentCacheEnable, | ||
| metadataSegmentPollPeriod, | ||
| serializeComplexValues | ||
| ); | ||
| } | ||
|
|
@@ -239,6 +260,8 @@ public String toString() | |
| ", useFallback=" + useFallback + | ||
| ", requireTimeCondition=" + requireTimeCondition + | ||
| ", awaitInitializationOnStart=" + awaitInitializationOnStart + | ||
| ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable + | ||
| ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + | ||
| ", sqlTimeZone=" + sqlTimeZone + | ||
| ", serializeComplexValues=" + serializeComplexValues + | ||
| '}'; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add a javadoc about what this is doing and why we need two interners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interners with weak references shouldn't be used: #7395