Skip to content

Commit b9186f8

Browse files
leventovjihoonson
authored andcommitted
Reconcile terminology and method naming to 'used/unused segments'; Rename MetadataSegmentManager to MetadataSegmentsManager (#7306)
* Reconcile terminology and method naming to 'used/unused segments'; Don't use terms 'enable/disable data source'; Rename MetadataSegmentManager to MetadataSegments; Make REST API methods which mark segments as used/unused to return server error instead of an empty response in case of error * Fix brace * Import order * Rename withKillDataSourceWhitelist to withSpecificDataSourcesToKill * Fix tests * Fix tests by adding proper methods without interval parameters to IndexerMetadataStorageCoordinator instead of hacking with Intervals.ETERNITY * More aligned names of DruidCoordinatorHelpers, rename several CoordinatorDynamicConfig parameters * Rename ClientCompactTaskQuery to ClientCompactionTaskQuery for consistency with CompactionTask; ClientCompactQueryTuningConfig to ClientCompactionTaskQueryTuningConfig * More variable and method renames * Rename MetadataSegments to SegmentsMetadata * Javadoc update * Simplify SegmentsMetadata.getUnusedSegmentIntervals(), more javadocs * Update Javadoc of VersionedIntervalTimeline.iterateAllObjects() * Reorder imports * Rename SegmentsMetadata.tryMark... methods to mark... and make them to return boolean and the numbers of segments changed and relay exceptions to callers * Complete merge * Add CollectionUtils.newTreeSet(); Refactor DruidCoordinatorRuntimeParams creation in tests * Remove MetadataSegmentManager * Rename millisLagSinceCoordinatorBecomesLeaderBeforeCanMarkAsUnusedOvershadowedSegments to leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments * Fix tests, refactor DruidCluster creation in tests into DruidClusterBuilder * Fix inspections * Fix SQLMetadataSegmentManagerEmptyTest and rename it to SqlSegmentsMetadataEmptyTest * Rename SegmentsAndMetadata to SegmentsAndCommitMetadata to reduce the similarity with SegmentsMetadata; Rename some methods * Rename DruidCoordinatorHelper to CoordinatorDuty, refactor DruidCoordinator * Unused import * Optimize imports * Rename IndexerSQLMetadataStorageCoordinator.getDataSourceMetadata() to retrieveDataSourceMetadata() * Unused import * Update terminology in datasource-view.tsx * Fix label in datasource-view.spec.tsx.snap * Fix lint errors in datasource-view.tsx * Doc improvements * Another attempt to please TSLint * Another attempt to please TSLint * Style fixes * Fix IndexerSQLMetadataStorageCoordinator.createUsedSegmentsSqlQueryForIntervals() (wrong merge) * Try to fix docs build issue * Javadoc and spelling fixes * Rename SegmentsMetadata to SegmentsMetadataManager, address other comments * Address more comments
1 parent c6c8b80 commit b9186f8

File tree

181 files changed

+2361
-2063
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

181 files changed

+2361
-2063
lines changed

.idea/inspectionProfiles/Druid.xml

Lines changed: 15 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
import org.apache.druid.client.DataSourcesSnapshot;
2525
import org.apache.druid.jackson.DefaultObjectMapper;
2626
import org.apache.druid.java.util.common.DateTimes;
27-
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
28-
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
29-
import org.apache.druid.server.coordinator.helper.NewestSegmentFirstPolicy;
27+
import org.apache.druid.server.coordinator.duty.CompactionSegmentIterator;
28+
import org.apache.druid.server.coordinator.duty.CompactionSegmentSearchPolicy;
29+
import org.apache.druid.server.coordinator.duty.NewestSegmentFirstPolicy;
3030
import org.apache.druid.timeline.DataSegment;
3131
import org.apache.druid.timeline.VersionedIntervalTimeline;
3232
import org.apache.druid.timeline.partition.NumberedShardSpec;

core/src/main/java/org/apache/druid/java/util/metrics/AllocationMetricCollectors.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.druid.java.util.common.logger.Logger;
2323

2424
import javax.annotation.Nullable;
25-
2625
import java.lang.management.ManagementFactory;
2726
import java.lang.management.ThreadMXBean;
2827
import java.lang.reflect.Method;

core/src/main/java/org/apache/druid/metadata/MetadataStorageConnector.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.metadata;
2121

22+
import javax.annotation.Nullable;
2223
import java.util.List;
2324

2425
/**
@@ -36,7 +37,11 @@ Void insertOrUpdate(
3637
byte[] value
3738
);
3839

39-
byte[] lookup(
40+
/**
41+
* Returns the value of the valueColumn when there is only one row matched to the given key.
42+
* This method returns null if there is no such row and throws an error if there are more than one rows.
43+
*/
44+
@Nullable byte[] lookup(
4045
String tableName,
4146
String keyColumn,
4247
String valueColumn,

core/src/main/java/org/apache/druid/metadata/MetadataStorageConnectorConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonProperty;
2323
import org.apache.druid.java.util.common.StringUtils;
24+
2425
import java.util.Properties;
2526

2627
/**

core/src/main/java/org/apache/druid/segment/SegmentUtils.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.segment;
2121

22+
import com.google.common.collect.Collections2;
2223
import com.google.common.hash.HashFunction;
2324
import com.google.common.hash.Hasher;
2425
import com.google.common.hash.Hashing;
@@ -37,7 +38,6 @@
3738
import java.util.Collection;
3839
import java.util.Collections;
3940
import java.util.List;
40-
import java.util.stream.Collectors;
4141

4242
/**
4343
* Utility methods useful for implementing deep storage extensions.
@@ -78,16 +78,14 @@ public static int getVersionFromDir(File inDir) throws IOException
7878
}
7979

8080
/**
81-
* Returns a String with identifiers of "segments" comma-separated. Useful for log messages. Not useful for anything
82-
* else, because this doesn't take special effort to escape commas that occur in identifiers (not common, but could
83-
* potentially occur in a datasource name).
81+
* Returns an object whose toString() returns a String with identifiers of the given segments, comma-separated. Useful
82+
* for log messages. Not useful for anything else, because this doesn't take special effort to escape commas that
83+
* occur in identifiers (not common, but could potentially occur in a datasource name).
8484
*/
85-
public static String commaSeparateIdentifiers(final Collection<DataSegment> segments)
85+
public static Object commaSeparatedIdentifiers(final Collection<DataSegment> segments)
8686
{
87-
return segments
88-
.stream()
89-
.map(segment -> segment.getId().toString())
90-
.collect(Collectors.joining(", "));
87+
// Lazy, to avoid preliminary string creation if logging level is turned off
88+
return Collections2.transform(segments, DataSegment::getId);
9189
}
9290

9391
private SegmentUtils()

core/src/main/java/org/apache/druid/timeline/VersionedIntervalTimeline.java

Lines changed: 75 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.util.Collection;
3939
import java.util.Comparator;
4040
import java.util.HashMap;
41-
import java.util.HashSet;
4241
import java.util.IdentityHashMap;
4342
import java.util.Iterator;
4443
import java.util.List;
@@ -56,25 +55,38 @@
5655
/**
5756
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
5857
*
59-
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
58+
* It associates an {@link Interval} and a generically-typed version with the object that is being stored.
6059
*
6160
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
6261
* with a timeline entry remains unchanged when chunking occurs.
6362
*
64-
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
65-
* recent objects (according to the version) that match the given interval. The intent is that objects represent
66-
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
67-
* at in order to get a correct answer about that time period.
63+
* After loading objects via the {@link #add} method, the {@link #lookup(Interval)} method can be used to get the list
64+
* of the most recent objects (according to the version) that match the given interval. The intent is that objects
65+
* represent a certain time period and when you do a {@link #lookup(Interval)}, you are asking for all of the objects
66+
* that you need to look at in order to get a correct answer about that time period.
6867
*
69-
* The {@link #findFullyOvershadowed} method returns a list of objects that will never be returned by a call to lookup()
70-
* because they are overshadowed by some other object. This can be used in conjunction with the add() and remove()
71-
* methods to achieve "atomic" updates. First add new items, then check if those items caused anything to be
72-
* overshadowed, if so, remove the overshadowed elements and you have effectively updated your data set without any user
73-
* impact.
68+
* The {@link #findFullyOvershadowed} method returns a list of objects that will never be returned by a call to {@link
69+
* #lookup} because they are overshadowed by some other object. This can be used in conjunction with the {@link #add}
70+
* and {@link #remove} methods to achieve "atomic" updates. First add new items, then check if those items caused
71+
* anything to be overshadowed, if so, remove the overshadowed elements and you have effectively updated your data set
72+
* without any user impact.
7473
*/
7574
public class VersionedIntervalTimeline<VersionType, ObjectType extends Overshadowable<ObjectType>>
7675
implements TimelineLookup<VersionType, ObjectType>
7776
{
77+
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
78+
{
79+
return forSegments(segments.iterator());
80+
}
81+
82+
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
83+
{
84+
final VersionedIntervalTimeline<String, DataSegment> timeline =
85+
new VersionedIntervalTimeline<>(Comparator.naturalOrder());
86+
addSegments(timeline, segments);
87+
return timeline;
88+
}
89+
7890
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
7991

8092
// Below timelines stores only *visible* timelineEntries
@@ -99,19 +111,6 @@ public VersionedIntervalTimeline(Comparator<? super VersionType> versionComparat
99111
this.versionComparator = versionComparator;
100112
}
101113

102-
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
103-
{
104-
return forSegments(segments.iterator());
105-
}
106-
107-
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
108-
{
109-
final VersionedIntervalTimeline<String, DataSegment> timeline =
110-
new VersionedIntervalTimeline<>(Comparator.naturalOrder());
111-
addSegments(timeline, segments);
112-
return timeline;
113-
}
114-
115114
public static void addSegments(
116115
VersionedIntervalTimeline<String, DataSegment> timeline,
117116
Iterator<DataSegment> segments
@@ -151,6 +150,11 @@ public Collection<ObjectType> iterateAllObjects()
151150
);
152151
}
153152

153+
public int getNumObjects()
154+
{
155+
return numObjects.get();
156+
}
157+
154158
/**
155159
* Computes a set with all objects falling within the specified interval which are at least partially "visible" in
156160
* this interval (that is, are not fully overshadowed within this interval).
@@ -371,62 +375,69 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findFullyOvershadowed(
371375
lock.readLock().lock();
372376
try {
373377
// 1. Put all timelineEntries and remove all visible entries to find out only non-visible timelineEntries.
374-
final Map<Interval, Map<VersionType, TimelineEntry>> overShadowed = new HashMap<>();
375-
for (Map.Entry<Interval, TreeMap<VersionType, TimelineEntry>> versionEntry : allTimelineEntries.entrySet()) {
376-
@SuppressWarnings("unchecked")
377-
Map<VersionType, TimelineEntry> versionCopy = (TreeMap) versionEntry.getValue().clone();
378-
overShadowed.put(versionEntry.getKey(), versionCopy);
379-
}
380-
381-
for (Entry<Interval, TimelineEntry> entry : completePartitionsTimeline.entrySet()) {
382-
Map<VersionType, TimelineEntry> versionEntry = overShadowed.get(entry.getValue().getTrueInterval());
383-
if (versionEntry != null) {
384-
versionEntry.remove(entry.getValue().getVersion());
385-
if (versionEntry.isEmpty()) {
386-
overShadowed.remove(entry.getValue().getTrueInterval());
387-
}
388-
}
389-
}
390-
391-
for (Entry<Interval, TimelineEntry> entry : incompletePartitionsTimeline.entrySet()) {
392-
Map<VersionType, TimelineEntry> versionEntry = overShadowed.get(entry.getValue().getTrueInterval());
393-
if (versionEntry != null) {
394-
versionEntry.remove(entry.getValue().getVersion());
395-
if (versionEntry.isEmpty()) {
396-
overShadowed.remove(entry.getValue().getTrueInterval());
397-
}
398-
}
399-
}
400-
401-
final Set<TimelineObjectHolder<VersionType, ObjectType>> retVal = new HashSet<>();
402-
for (Entry<Interval, Map<VersionType, TimelineEntry>> versionEntry : overShadowed.entrySet()) {
403-
for (Entry<VersionType, TimelineEntry> entry : versionEntry.getValue().entrySet()) {
404-
final TimelineEntry timelineEntry = entry.getValue();
405-
retVal.add(timelineEntryToObjectHolder(timelineEntry));
406-
}
407-
}
378+
final Map<Interval, Map<VersionType, TimelineEntry>> overshadowedPartitionsTimeline =
379+
computeOvershadowedPartitionsTimeline();
380+
381+
final Set<TimelineObjectHolder<VersionType, ObjectType>> overshadowedObjects = overshadowedPartitionsTimeline
382+
.values()
383+
.stream()
384+
.flatMap(
385+
(Map<VersionType, TimelineEntry> entry) -> entry.values().stream().map(this::timelineEntryToObjectHolder)
386+
)
387+
.collect(Collectors.toSet());
408388

409-
// 2. Visible timelineEntries can also have overshadowed segments. Add them to the result too.
389+
// 2. Visible timelineEntries can also have overshadowed objects. Add them to the result too.
410390
for (TimelineEntry entry : incompletePartitionsTimeline.values()) {
411-
final List<PartitionChunk<ObjectType>> entryOvershadowed = entry.partitionHolder.getOvershadowed();
412-
if (!entryOvershadowed.isEmpty()) {
413-
retVal.add(
391+
final List<PartitionChunk<ObjectType>> overshadowedEntries = entry.partitionHolder.getOvershadowed();
392+
if (!overshadowedEntries.isEmpty()) {
393+
overshadowedObjects.add(
414394
new TimelineObjectHolder<>(
415395
entry.trueInterval,
416396
entry.version,
417-
new PartitionHolder<>(entryOvershadowed)
397+
new PartitionHolder<>(overshadowedEntries)
418398
)
419399
);
420400
}
421401
}
422402

423-
return retVal;
403+
return overshadowedObjects;
424404
}
425405
finally {
426406
lock.readLock().unlock();
427407
}
428408
}
429409

410+
private Map<Interval, Map<VersionType, TimelineEntry>> computeOvershadowedPartitionsTimeline()
411+
{
412+
final Map<Interval, Map<VersionType, TimelineEntry>> overshadowedPartitionsTimeline = new HashMap<>();
413+
allTimelineEntries.forEach((Interval interval, TreeMap<VersionType, TimelineEntry> versionEntry) -> {
414+
@SuppressWarnings("unchecked")
415+
Map<VersionType, TimelineEntry> versionEntryCopy = (TreeMap) versionEntry.clone();
416+
overshadowedPartitionsTimeline.put(interval, versionEntryCopy);
417+
});
418+
419+
for (TimelineEntry entry : completePartitionsTimeline.values()) {
420+
overshadowedPartitionsTimeline.computeIfPresent(
421+
entry.getTrueInterval(),
422+
(Interval interval, Map<VersionType, TimelineEntry> versionEntry) -> {
423+
versionEntry.remove(entry.getVersion());
424+
return versionEntry.isEmpty() ? null : versionEntry;
425+
}
426+
);
427+
}
428+
429+
for (TimelineEntry entry : incompletePartitionsTimeline.values()) {
430+
overshadowedPartitionsTimeline.computeIfPresent(
431+
entry.getTrueInterval(),
432+
(Interval interval, Map<VersionType, TimelineEntry> versionEntry) -> {
433+
versionEntry.remove(entry.getVersion());
434+
return versionEntry.isEmpty() ? null : versionEntry;
435+
}
436+
);
437+
}
438+
return overshadowedPartitionsTimeline;
439+
}
440+
430441
public boolean isOvershadowed(Interval interval, VersionType version, ObjectType object)
431442
{
432443
lock.readLock().lock();

0 commit comments

Comments
 (0)