Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e8b0a23
Add published segment cache in broker
Jan 22, 2019
6ec4911
Change the DataSegment interner so it's not based on DataSEgment's eq…
Jan 27, 2019
8b6d453
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 27, 2019
281600d
Use separate interner for realtime and historical segments
Jan 28, 2019
cdc751e
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 28, 2019
8fed80a
Remove trueEquals as it's not used anymore, change log message
Jan 28, 2019
0d0f89f
PR comments
Jan 29, 2019
8fc84b3
PR comments
Jan 29, 2019
adf133f
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 29, 2019
d46edc5
Fix tests
Jan 29, 2019
92601d0
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 29, 2019
a993482
PR comments
Jan 30, 2019
a4cbcfc
Few more modification to
Jan 30, 2019
e376df3
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 30, 2019
0fbb48c
minor changes
Jan 30, 2019
8df2d96
PR comments
Jan 31, 2019
d5b7d79
PR comments
Jan 31, 2019
2bfa396
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Jan 31, 2019
632d741
Make the segment cache in broker off by default
Feb 1, 2019
bcc6513
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
c41f085
Add doc for new planner config
Feb 1, 2019
07a80b1
Update documentation
Feb 1, 2019
d83eb33
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
a440c0c
PR comments
Feb 1, 2019
b385345
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
981b080
some more changes
Feb 1, 2019
3a94cae
PR comments
Feb 1, 2019
0032a31
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
ad28458
fix test
Feb 1, 2019
33751a4
Merge branch 'master' of github.com:druid-io/druid into broker-segmen…
Feb 1, 2019
2ddb7a1
remove unintentional change, whether to synchronize on lifecycleLock …
Feb 1, 2019
1b37493
minor changes
Feb 1, 2019
60dbf41
some changes to initialization
Feb 2, 2019
8860053
use pollPeriodInMS
Feb 2, 2019
ca21779
Add boolean cachePopulated to check if first poll succeeds
Feb 2, 2019
3a70f39
Remove poll from start()
Feb 2, 2019
e2a9af7
take the log message out of condition in stop()
Feb 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import org.apache.druid.timeline.DataSegment;

public class DataSegmentInterner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a javadoc about what this is doing and why we need two interners?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interners with weak references shouldn't be used: #7395

{
public static final Interner<DataSegment> REALTIME_INTERNER = Interners.newWeakInterner();
public static final Interner<DataSegment> HISTORICAL_INTERNER = Interners.newWeakInterner();

private DataSegmentInterner()
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment // No instantiation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the comment

}

}
174 changes: 174 additions & 0 deletions server/src/main/java/org/apache/druid/client/MetadataSegmentView.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ManageLifecycle
public class MetadataSegmentView
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you can add a simple description about what this class does and how it's being used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, added description.

{

private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to make this configurable? If so, please add a configuration. Otherwise, please rename to POLL_PERIOD_IN_MS.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd want to make it configurable, may be in a follow-up PR. For now, i renamed the variable.

private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be Logger since it emits nothing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, changed to Logger

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed back to EmittingLogger, since it emits something now :)


private final DruidLeaderClient coordinatorDruidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final BrokerSegmentWatcherConfig segmentWatcherConfig;

private final Map<DataSegment, DateTime> publishedSegments = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change its type to ConcurrentMap (#6898).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private ScheduledExecutorService scheduledExec;

@Inject
public MetadataSegmentView(
final @Coordinator DruidLeaderClient druidLeaderClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add final to these two variables.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final BrokerSegmentWatcherConfig segmentWatcherConfig
)
{
this.coordinatorDruidLeaderClient = druidLeaderClient;
this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.segmentWatcherConfig = segmentWatcherConfig;
}

@LifecycleStart
public void start()
{
scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
scheduledExec.scheduleWithFixedDelay(
() -> poll(),
0,
DEFAULT_POLL_PERIOD_IN_MS,
TimeUnit.MILLISECONDS
);
}

@LifecycleStop
public void stop()
{
scheduledExec.shutdownNow();
scheduledExec = null;
}

private void poll()
{
log.info("polling published segments from coordinator");
//get authorized published segments from coordinator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment is not correct? The authorization happens in SystemSchema.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah, since i copied over this code from SystemSchema . Will remove.

final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments(
coordinatorDruidLeaderClient,
jsonMapper,
responseHandler
);

final DateTime ts = DateTimes.nowUtc();
while (metadataSegments.hasNext()) {
final DataSegment currentSegment = metadataSegments.next();
final DataSegment interned;
if (currentSegment.getSize() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a method to DataSegmentInterner which has this check.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added common method getInterner(segment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for pushing this down, and also using in ServerSelector constructor

interned = DataSegmentInterner.HISTORICAL_INTERNER.intern(currentSegment);
} else {
interned = DataSegmentInterner.REALTIME_INTERNER.intern(currentSegment);
}
publishedSegments.put(interned, ts);
}
// filter the segments from cache which may not be present in subsequent polling
publishedSegments.values().removeIf(v -> v != ts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you elaborate more on what this is doing? And what happens if someone reads a segment by calling getPublishedSegments which is supposed to be removed but haven't yet?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more details in the comment.

And what happens if someone reads a segment by calling getPublishedSegments which is supposed to be removed but haven't yet?

If that happens, the segment will be removed in the next poll, so publishedSegments will be eventually consistent.


if (segmentWatcherConfig.getWatchedDataSources() != null) {
log.debug(
"filtering datasources[%s] in published segments based on broker's watchedDataSources",
segmentWatcherConfig.getWatchedDataSources()
);
publishedSegments.keySet()
.removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be getting only the segments of watched dataSources in the first place rather than filtering later.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks. Changed the /segments api in MetadataResource to take datasources, so we can pass watchedDataSources to it and filtering can happen before broker gets the published segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed now because getMetadataSegments() returns only filtered segments.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, not needed anymore, missed to remove it.

}
}

public Iterator<DataSegment> getPublishedSegments()
{
return publishedSegments.keySet().iterator();
}

// Note that coordinator must be up to get segments
private static JsonParserIterator<DataSegment> getMetadataSegments(
DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler
)
{
Request request;
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/coordinator/v1/metadata/segments"),
false
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = coordinatorClient.goAsync(
request,
responseHandler
);

final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
return new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.client.selector;

import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.DataSegmentInterner;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -50,7 +51,9 @@ public ServerSelector(
TierSelectorStrategy strategy
)
{
this.segment = new AtomicReference<>(segment);
this.segment = new AtomicReference<>(segment.getSize() > 0
? DataSegmentInterner.HISTORICAL_INTERNER.intern(segment)
: DataSegmentInterner.REALTIME_INTERNER.intern(segment));
this.strategy = strategy;
this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.client.MetadataSegmentView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.IndexingService;
Expand Down Expand Up @@ -84,6 +85,7 @@
import java.util.Objects;
import java.util.Set;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unnecessary change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, removed.

public class SystemSchema extends AbstractSchema
{
public static final String NAME = "sys";
Expand Down Expand Up @@ -149,6 +151,7 @@ public class SystemSchema extends AbstractSchema
@Inject
public SystemSchema(
final DruidSchema druidSchema,
final MetadataSegmentView metadataView,
final TimelineServerView serverView,
final AuthorizerMapper authorizerMapper,
final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
Expand All @@ -158,11 +161,10 @@ public SystemSchema(
{
Preconditions.checkNotNull(serverView, "serverView");
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
SegmentsTable segmentsTable = new SegmentsTable(
final SegmentsTable segmentsTable = new SegmentsTable(
druidSchema,
coordinatorDruidLeaderClient,
metadataView,
jsonMapper,
responseHandler,
authorizerMapper
);
this.tableMap = ImmutableMap.of(
Expand All @@ -182,23 +184,20 @@ public Map<String, Table> getTableMap()
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
private final DruidLeaderClient druidLeaderClient;
private final ObjectMapper jsonMapper;
private final BytesAccumulatingResponseHandler responseHandler;
private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView;

public SegmentsTable(
DruidSchema druidSchemna,
DruidLeaderClient druidLeaderClient,
MetadataSegmentView metadataView,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler,
AuthorizerMapper authorizerMapper
)
{
this.druidSchema = druidSchemna;
this.druidLeaderClient = druidLeaderClient;
this.metadataView = metadataView;
this.jsonMapper = jsonMapper;
this.responseHandler = responseHandler;
this.authorizerMapper = authorizerMapper;
}

Expand Down Expand Up @@ -231,21 +230,14 @@ public Enumerable<Object[]> scan(DataContext root)
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}

//get published segments from coordinator
final JsonParserIterator<DataSegment> metadataSegments = getMetadataSegments(
druidLeaderClient,
jsonMapper,
responseHandler
);
//get published segments from metadata segment cache
final Iterator<DataSegment> pubSegments = metadataView.getPublishedSegments();

final Set<SegmentId> segmentsAlreadySeen = new HashSet<>();

final FluentIterable<Object[]> publishedSegments = FluentIterable
.from(() -> getAuthorizedPublishedSegments(
metadataSegments,
root
))
.transform((DataSegment val) -> {
.from(() -> pubSegments)
.transform(val -> {
try {
segmentsAlreadySeen.add(val.getId());
final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId());
Expand Down Expand Up @@ -340,27 +332,6 @@ private Iterator<Entry<DataSegment, SegmentMetadataHolder>> getAuthorizedAvailab
return authorizedSegments.iterator();
}

private CloseableIterator<DataSegment> getAuthorizedPublishedSegments(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this authorization not needed anymore?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's needed because, MetadataResource#getDatabaseSegments does the authorization check as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this authorization check is still required, because broker->coordinator call uses escalated client, so MetadataResource is using the escalated client. Will add this back.

JsonParserIterator<DataSegment> it,
DataContext root
)
{
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);

Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));

final Iterable<DataSegment> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
() -> it,
raGenerator,
authorizerMapper
);

return wrap(authorizedSegments.iterator(), it);
}

private static class PartialSegmentData
{
private final long isAvailable;
Expand Down Expand Up @@ -404,44 +375,6 @@ public long getNumRows()
}
}

// Note that coordinator must be up to get segments
private static JsonParserIterator<DataSegment> getMetadataSegments(
DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper,
BytesAccumulatingResponseHandler responseHandler
)
{

Request request;
try {
request = coordinatorClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid/coordinator/v1/metadata/segments"),
false
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
ListenableFuture<InputStream> future = coordinatorClient.goAsync(
request,
responseHandler
);

final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference<DataSegment>()
{
});
return new JsonParserIterator<>(
typeRef,
future,
request.getUrl().toString(),
null,
request.getUrl().getHost(),
jsonMapper,
responseHandler
);
}

static class ServersTable extends AbstractTable implements ScannableTable
{
private final TimelineServerView serverView;
Expand Down
Loading