Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e8b0a23
Add published segment cache in broker
Jan 22, 2019
6ec4911
Change the DataSegment interner so it's not based on DataSEgment's eq…
Jan 27, 2019
8b6d453
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 27, 2019
281600d
Use separate interner for realtime and historical segments
Jan 28, 2019
cdc751e
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 28, 2019
8fed80a
Remove trueEquals as it's not used anymore, change log message
Jan 28, 2019
0d0f89f
PR comments
Jan 29, 2019
8fc84b3
PR comments
Jan 29, 2019
adf133f
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 29, 2019
d46edc5
Fix tests
Jan 29, 2019
92601d0
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 29, 2019
a993482
PR comments
Jan 30, 2019
a4cbcfc
Few more modification to
Jan 30, 2019
e376df3
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 30, 2019
0fbb48c
minor changes
Jan 30, 2019
8df2d96
PR comments
Jan 31, 2019
d5b7d79
PR comments
Jan 31, 2019
2bfa396
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 31, 2019
632d741
Make the segment cache in broker off by default
Feb 1, 2019
bcc6513
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
c41f085
Add doc for new planner config
Feb 1, 2019
07a80b1
Update documentation
Feb 1, 2019
d83eb33
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
a440c0c
PR comments
Feb 1, 2019
b385345
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
981b080
some more changes
Feb 1, 2019
3a94cae
PR comments
Feb 1, 2019
0032a31
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
ad28458
fix test
Feb 1, 2019
33751a4
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
2ddb7a1
remove unintentional change, whether to synchronize on lifecycleLock …
Feb 1, 2019
1b37493
minor changes
Feb 1, 2019
60dbf41
some changes to initialization
Feb 2, 2019
8860053
use pollPeriodInMS
Feb 2, 2019
ca21779
Add boolean cachePopulated to check if first poll succeeds
Feb 2, 2019
3a70f39
Remove poll from start()
Feb 2, 2019
e2a9af7
take the log message out of condition in stop()
Feb 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import org.apache.druid.timeline.DataSegment;

public class DataSegmentInterner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a javadoc about what this is doing and why we need two interners?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interners with weak references shouldn't be used: #7395

{
private static final Interner<DataSegment> REALTIME_INTERNER = Interners.newWeakInterner();
private static final Interner<DataSegment> HISTORICAL_INTERNER = Interners.newWeakInterner();

private DataSegmentInterner()
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment // No instantiation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the comment

}

public static DataSegment intern(DataSegment segment)
{
return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth to comment why size can be an indicator of a realtime or historical segment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space (there are a few others scattered about as well, please clean up if you have the chance)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed empty line here

}
200 changes: 200 additions & 0 deletions server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* This class polls the coordinator in background to keep the latest published segments.
* Provides {@link #getPublishedSegments()} for others to get segments in metadata store.
*/
@ManageLifecycle
public class MetadataSegmentView
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you can add a simple description about what this class does and how it's being used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, added description.

{

private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to make this configurable? If so, please add a configuration. Otherwise, please rename to POLL_PERIOD_IN_MS.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd want to make it configurable, may be in a follow-up PR. For now, i renamed the variable.

private static final Logger log = new Logger(MetadataSegmentView.class);

private final DruidLeaderClient coordinatorDruidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final BrokerSegmentWatcherConfig segmentWatcherConfig;

private final ConcurrentMap<DataSegment, DateTime> publishedSegments = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the size of this map would be usually very large. It would be better to initialize with a larger initial size like 1000.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, right, will set initialCapacity

private ScheduledExecutorService scheduledExec;

@Inject
public MetadataSegmentView(
final @Coordinator DruidLeaderClient druidLeaderClient,
final ObjectMapper jsonMapper,
final BytesAccumulatingResponseHandler responseHandler,
final BrokerSegmentWatcherConfig segmentWatcherConfig
)
{
this.coordinatorDruidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.segmentWatcherConfig = segmentWatcherConfig;
}

@LifecycleStart
public void start()
{
scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS);
}

@LifecycleStop
public void stop()
{
scheduledExec.shutdownNow();
scheduledExec = null;
}

private void poll()
{
log.info("polling published segments from coordinator");
//get authorized published segments from coordinator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment is not correct? The authorization happens in SystemSchema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah, since i copied over this code from SystemSchema . Will remove.

final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments(
coordinatorDruidLeaderClient,
jsonMapper,
responseHandler,
segmentWatcherConfig.getWatchedDataSources()
);

final DateTime timestamp = DateTimes.nowUtc();
while (metadataSegments.hasNext()) {
final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next());
// timestamp is used to filter deleted segments
publishedSegments.put(interned, timestamp);
}
// filter the segments from cache whose timestamp is not equal to latest timestamp stored,
// since the presence of a segment with an earlier timestamp indicates that
// "that" segment is not returned by coordinator in latest poll, so it's
// likely deleted and therefore we remove it from publishedSegments
final Set<DateTime> toBeRemovedSegments = publishedSegments.values()
.stream()
.filter(v -> v != timestamp)
.collect(Collectors.toSet());
publishedSegments.values().removeAll(toBeRemovedSegments);

if (segmentWatcherConfig.getWatchedDataSources() != null) {
log.debug(
"filtering datasources[%s] in published segments based on broker's watchedDataSources",
segmentWatcherConfig.getWatchedDataSources()
);
publishedSegments.keySet()
.removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be getting only the segments of watched dataSources in the first place rather than filtering later.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks. Changed the /segments api in MetadataResource to take datasources, so we can pass watchedDataSources to it and filtering can happen before broker gets the published segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed now because getMetadataSegments() returns only filtered segments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, not needed anymore, missed to remove it.

}
}

public Iterator<DataSegment> getPublishedSegments()
{
return publishedSegments.keySet().iterator();
}

// Note that coordinator must be up to get segments
private static JsonParserIterator<DataSegment> getMetadataSegments(
DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
Set<String> watchedDataSources
)
{
String query = "/druid/coordinator/v1/metadata/segments";
if (watchedDataSources != null && !watchedDataSources.isEmpty()) {
final StringBuilder sb = new StringBuilder();
for (String ds : watchedDataSources) {
sb.append("datasources=" + ds + "&");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sb.append("datasources=").append(ds).append("&")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to delete the last &.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, doing that later by setting the sb length to sb.length()-1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, missed that one

}
sb.setLength(Math.max(sb.length() - 1, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sb.length() is always larger than 0?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, yes

query = "/druid/coordinator/v1/metadata/segments?" + sb;
}
Request request;
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format(query),
false
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = coordinatorClient.goAsync(
request,
responseHandler
);

final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
return new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
);
}

private class PollTask implements Callable<Void>
{
@Override
public Void call()
{
poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to let users know if poll() fails. Please revert logging back. Probably it's worth to catch all exceptions like

try {
  poll();
}
catch (Exception e) {
  // emit exception
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And since it catches all exceptions, PollTask can be Runnable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, catching all exceptions in the Runnable

scheduledExec.schedule(new PollTask(), DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the next poll would happen after DEFAULT_POLL_PERIOD_IN_MS which I don't think you intended here. You should probably check how long poll() took and compute the wait time for the next poll. It should be 0 if it already passed the next poll time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added computing the delay after every poll() call

return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.client.selector;

import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -50,7 +51,7 @@ public ServerSelector(
TierSelectorStrategy strategy
)
{
this.segment = new AtomicReference<>(segment);
this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure segment still needs to be an AtomicReference.. Let's figure it out in #6952.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

this.strategy = strategy;
this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -148,14 +149,22 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final
@GET
@Path("/segments")
@Produces(MediaType.APPLICATION_JSON)
public Response getDatabaseSegments(@Context final HttpServletRequest req)
public Response getDatabaseSegments(
@Context final HttpServletRequest req,
@QueryParam("datasources") final Set<String> datasources
)
{
final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
if (datasources != null && !datasources.isEmpty()) {
druidDataSources = druidDataSources.stream()
.filter(src -> datasources.contains(src.getName()))
.collect(Collectors.toSet());
Copy link
Member

@leventov leventov Mar 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@surekhasaharan we don't want to collect ImmutableDruidDataSource objects, which are already known to be unique here, into a set. It induces expensive O(n) hash code computations (which is also totally unnecessary; ImmutableDruidDataSource.hashCode() should be cheaper. But that's another question.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @leventov for catching this, will change to list in a follow up PR.

}
final Stream<DataSegment> metadataSegments = druidDataSources
.stream()
.flatMap(t -> t.getSegments().stream());

Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));

final Iterable<DataSegment> authorizedSegments =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2213,13 +2213,13 @@ private List<Map<DruidServer, ServerExpectations>> populateTimeline(
expectedResults.get(k).get(j)
);
serverExpectations.get(lastServer).addExpectation(expectation);

EasyMock.expect(mockSegment.getSize()).andReturn(0L).anyTimes();
EasyMock.replay(mockSegment);
ServerSelector selector = new ServerSelector(
expectation.getSegment(),
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment());

final ShardSpec shardSpec;
if (numChunks == 1) {
shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0);
Expand All @@ -2234,6 +2234,7 @@ private List<Map<DruidServer, ServerExpectations>> populateTimeline(
}
shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j);
}
EasyMock.reset(mockSegment);
EasyMock.expect(mockSegment.getShardSpec())
.andReturn(shardSpec)
.anyTimes();
Expand Down
Loading