Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .idea/inspectionProfiles/Druid.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ public PutObjectResult putObject(String bucketName, String key)
@Override
public PutObjectResult putObject(String bucketName, String key, File file)
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, new HashSet<>());
}
storage.putIfAbsent(bucketName, new HashSet<>());
storage.get(bucketName).add(key);
return new PutObjectResult();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ protected void innerMap(
.getSegmentGranularity()
.bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));

if (!hyperLogLogs.containsKey(interval)) {
hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
}
hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector());
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ public Set<DataSegment> findUsedSegments(Set<SegmentIdWithShardSpec> identifiers
// Group by dataSource
final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource = new TreeMap<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
if (!identifiersByDataSource.containsKey(identifier.getDataSource())) {
identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>());
}
identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>());

identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,9 +768,7 @@ private Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSp
}

if (determineNumPartitions) {
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
}
hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));

List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
Expand All @@ -781,9 +779,7 @@ private Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSp
} else {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.absent());
}
hllCollectors.putIfAbsent(interval, Optional.absent());
}
determinePartitionsMeters.incrementProcessed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,87 +204,87 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory)
segmentIds
);

try {
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();

// Download all segments locally.
// Note: this requires enough local storage space to fit all of the segments, even though
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
final DataSegment segment = chunk.getObject();
if (!segmentFileMap.containsKey(segment)) {
segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();

// Download all segments locally.
// Note: this requires enough local storage space to fit all of the segments, even though
// IngestSegmentFirehose iterates over the segments in series. We may want to change this
// to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
final DataSegment segment = chunk.getObject();

segmentFileMap.computeIfAbsent(segment, k -> {
try {
return segmentLoader.getSegmentFiles(segment);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

segmentLoader.getSegmentFiles(segment) throws SegmentLoadingException, which needs to be caught and re thrown as RuntimeException. This makes the outer try catch redundant (compilation error) as the SegmentLoadingException has already been caught.

}
}
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
});

final List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
dims = getUniqueDimensions(
timeLineSegments,
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}
}

final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
final List<String> dims;
if (dimensions != null) {
dims = dimensions;
} else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
} else {
dims = getUniqueDimensions(
timeLineSegments,
inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
);
}

final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
Iterables.concat(
Iterables.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>()
{
final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;

final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
Iterables.concat(
Iterables.transform(
timeLineSegments,
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>() {
@Override
public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
{
return
Iterables.transform(
holder.getObject(),
new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>() {
@Override
public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
{
return
Iterables.transform(
holder.getObject(),
new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>()
{
@Override
public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
{
final DataSegment segment = input.getObject();
try {
return new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(
indexIO.loadIndex(
Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getId()
)
)
),
holder.getInterval()
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
);
final DataSegment segment = input.getObject();
try {
return new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(
indexIO.loadIndex(
Preconditions.checkNotNull(
segmentFileMap.get(segment),
"File for segment %s", segment.getId()
)
)
),
holder.getInterval()
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
)
)
);
);
}
}
)
)
);

final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outer try/catch block removed.

final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
}

private long jitter(long input)
Expand Down Expand Up @@ -508,13 +508,14 @@ static List<String> getUniqueMetrics(List<TimelineObjectHolder<String, DataSegme
// segments to olders.

// timelineSegments are sorted in order of interval
int index = 0;
int[] index = {0};
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String metric : chunk.getObject().getMetrics()) {
if (!uniqueMetrics.containsKey(metric)) {
uniqueMetrics.put(metric, index++);
uniqueMetrics.computeIfAbsent(metric, k -> {
return index[0]++;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using index variable as int as is, the compiler complains "Variables used in lambda should be final or effectively final". The fix is to use an integer array with one element. Let me know if this is right.

);
}
}
}
Expand Down
Loading