From 50b80a002e4ebbb24516f25490850a12e26f90ee Mon Sep 17 00:00:00 2001 From: s0t016r Date: Sun, 26 May 2019 17:10:21 +0530 Subject: [PATCH 1/8] https://github.com/apache/incubator-druid/issues/7316 Use Map.putIfAbsent() instead of containsKey() + put() --- .idea/inspectionProfiles/Druid.xml | 10 ++++++---- .../druid/storage/s3/S3DataSegmentMoverTest.java | 4 +--- .../druid/indexer/DetermineHashedPartitionsJob.java | 4 +--- .../appenderator/ActionBasedUsedSegmentChecker.java | 4 +--- .../apache/druid/indexing/common/task/IndexTask.java | 8 ++------ .../firehose/IngestSegmentFirehoseFactory.java | 8 ++------ .../druid/indexing/overlord/ForkingTaskRunner.java | 4 +--- .../druid/metadata/SQLMetadataSupervisorManager.java | 4 +--- .../apache/druid/server/http/DataSourcesResource.java | 4 +--- .../druid/client/CachingClusteredClientTest.java | 4 +--- .../appenderator/StreamAppenderatorDriverTest.java | 4 +--- .../main/java/org/apache/druid/sql/SqlLifecycle.java | 4 +--- .../util/SpecificSegmentsQuerySegmentWalker.java | 4 +--- 13 files changed, 20 insertions(+), 46 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 9770c11edbfc..3b1bf13c3abd 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -88,9 +88,6 @@ - - @@ -161,7 +158,7 @@ - + @@ -306,6 +303,11 @@ + + + + + diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java index ec30aa93508f..34496d965d64 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java @@ -261,9 +261,7 @@ public PutObjectResult putObject(String bucketName, String key) @Override public PutObjectResult putObject(String bucketName, String key, File file) { - if (!storage.containsKey(bucketName)) { - storage.put(bucketName, new HashSet<>()); - } + storage.putIfAbsent(bucketName, new HashSet<>()); storage.get(bucketName).add(key); return new PutObjectResult(); } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index c83bc085a2f6..e6648e32d261 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -307,9 +307,7 @@ protected void innerMap( .getSegmentGranularity() .bucket(DateTimes.utc(inputRow.getTimestampFromEpoch())); - if (!hyperLogLogs.containsKey(interval)) { - hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector()); - } + hyperLogLogs.putIfAbsent(interval, HyperLogLogCollector.makeLatestCollector()); } else { final Optional maybeInterval = config.getGranularitySpec() .bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index a1eb90f88e76..f4fec949bfde 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -50,9 +50,7 @@ public Set findUsedSegments(Set identifiers // Group by dataSource final Map> identifiersByDataSource = new TreeMap<>(); for (SegmentIdWithShardSpec identifier : identifiers) { - if (!identifiersByDataSource.containsKey(identifier.getDataSource())) { - identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>()); - } + identifiersByDataSource.putIfAbsent(identifier.getDataSource(), new HashSet<>()); identifiersByDataSource.get(identifier.getDataSource()).add(identifier); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 20cee242c229..47ff9c086aa4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -768,9 +768,7 @@ private Map> collectIntervalsAndShardSp } if (determineNumPartitions) { - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); - } + hllCollectors.putIfAbsent(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); List groupKey = Rows.toGroupKey( queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), @@ -781,9 +779,7 @@ private Map> collectIntervalsAndShardSp } else { // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() // for the interval and don't instantiate a HLL collector - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.absent()); - } + hllCollectors.putIfAbsent(interval, Optional.absent()); } determinePartitionsMeters.incrementProcessed(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 4d1e0bf4e8bc..b0e283fc2a5f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -216,9 +216,7 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) for (TimelineObjectHolder holder : timeLineSegments) { for (PartitionChunk chunk : holder.getObject()) { final DataSegment segment = chunk.getObject(); - if (!segmentFileMap.containsKey(segment)) { - segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment)); - } + segmentFileMap.putIfAbsent(segment, segmentLoader.getSegmentFiles(segment)); } } @@ -512,9 +510,7 @@ static List getUniqueMetrics(List timelineHolder : Lists.reverse(timelineSegments)) { for (PartitionChunk chunk : timelineHolder.getObject()) { for (String metric : chunk.getObject().getMetrics()) { - if (!uniqueMetrics.containsKey(metric)) { - uniqueMetrics.put(metric, index++); - } + uniqueMetrics.putIfAbsent(metric, index++); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 05f2f52c3897..d8c096d69e5f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -214,8 +214,7 @@ public void unregisterListener(String listenerId) public ListenableFuture run(final Task task) { synchronized (tasks) { - if (!tasks.containsKey(task.getId())) { - tasks.put( + tasks.putIfAbsent( task.getId(), new ForkingTaskRunnerWorkItem( task, @@ -516,7 +515,6 @@ public TaskStatus call() ) ) ); - } saveRunningTasks(); return tasks.get(task.getId()).getResult(); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java index 83fcc0505c5a..a5354b1990bf 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java @@ -166,9 +166,7 @@ public Map> fold( { try { String specId = pair.lhs; - if (!retVal.containsKey(specId)) { - retVal.put(specId, new ArrayList<>()); - } + retVal.putIfAbsent(specId, new ArrayList<>()); retVal.get(specId).add(pair.rhs); return retVal; diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 5904d52583bb..7ad4a6a80114 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -549,9 +549,7 @@ private Map> getSimpleDatasource(String dataSourceNa continue; } - if (!tierDistinctSegments.containsKey(tier)) { - tierDistinctSegments.put(tier, new HashSet<>()); - } + tierDistinctSegments.putIfAbsent(tier, new HashSet<>()); long dataSourceSegmentSize = 0; for (DataSegment dataSegment : druidDataSource.getSegments()) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 59367c657164..a9508010942c 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2200,9 +2200,7 @@ private List> populateTimeline( serverExpectationList.add(serverExpectations); for (int j = 0; j < numChunks; ++j) { DruidServer lastServer = servers[random.nextInt(servers.length)]; - if (!serverExpectations.containsKey(lastServer)) { - serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); - } + serverExpectations.putIfAbsent(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); DataSegment mockSegment = makeMock(mocks, DataSegment.class); ServerExpectation expectation = new ServerExpectation<>( diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 9c3e9ccbe4ca..c7475d9f722e 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -428,9 +428,7 @@ public SegmentIdWithShardSpec allocate( synchronized (counters) { DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); final long timestampTruncated = dateTimeTruncated.getMillis(); - if (!counters.containsKey(timestampTruncated)) { - counters.put(timestampTruncated, new AtomicInteger()); - } + counters.putIfAbsent(timestampTruncated, new AtomicInteger()); final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); return new SegmentIdWithShardSpec( dataSource, diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index a9c78c48f6f8..db8f4645b8da 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -122,9 +122,7 @@ private Map contextWithSqlId(Map queryContext) if (queryContext != null) { newContext.putAll(queryContext); } - if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) { - newContext.put(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString()); - } + newContext.putIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString()); return newContext; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index af0e89454328..27394d9845a4 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -78,9 +78,7 @@ public SpecificSegmentsQuerySegmentWalker add( ) { final Segment segment = new QueryableIndexSegment(index, descriptor.getId()); - if (!timelines.containsKey(descriptor.getDataSource())) { - timelines.put(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural())); - } + timelines.putIfAbsent(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural())); final VersionedIntervalTimeline timeline = timelines.get(descriptor.getDataSource()); timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment)); From b8e44735f9f50dffd21e5c5eebdbcf3ac07d8959 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Sun, 26 May 2019 18:15:21 +0530 Subject: [PATCH 2/8] fixing indentation --- .../indexing/overlord/ForkingTaskRunner.java | 544 +++++++++--------- 1 file changed, 272 insertions(+), 272 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index d8c096d69e5f..ec8db450abcf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -214,307 +214,307 @@ public void unregisterListener(String listenerId) public ListenableFuture run(final Task task) { synchronized (tasks) { - tasks.putIfAbsent( - task.getId(), - new ForkingTaskRunnerWorkItem( - task, - exec.submit( - new Callable() - { - @Override - public TaskStatus call() - { - final String attemptUUID = UUID.randomUUID().toString(); - final File taskDir = taskConfig.getTaskDir(task.getId()); - final File attemptDir = new File(taskDir, attemptUUID); - - final ProcessHolder processHolder; - final String childHost = node.getHost(); - int childPort = -1; - int tlsChildPort = -1; - - if (node.isEnablePlaintextPort()) { - childPort = portFinder.findUnusedPort(); + tasks.putIfAbsent( + task.getId(), + new ForkingTaskRunnerWorkItem( + task, + exec.submit( + new Callable() + { + @Override + public TaskStatus call() + { + final String attemptUUID = UUID.randomUUID().toString(); + final File taskDir = taskConfig.getTaskDir(task.getId()); + final File attemptDir = new File(taskDir, attemptUUID); + + final ProcessHolder processHolder; + final String childHost = node.getHost(); + int childPort = -1; + int tlsChildPort = -1; + + if (node.isEnablePlaintextPort()) { + childPort = portFinder.findUnusedPort(); + } + + if (node.isEnableTlsPort()) { + tlsChildPort = portFinder.findUnusedPort(); + } + + final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort); + + try { + final Closer closer = Closer.create(); + try { + if (!attemptDir.mkdirs()) { + throw new IOE("Could not create directories: %s", attemptDir); + } + + final File taskFile = new File(taskDir, "task.json"); + final File statusFile = new File(attemptDir, "status.json"); + final File logFile = new File(taskDir, "log"); + final File reportsFile = new File(attemptDir, "report.json"); + + // time to adjust process holders + synchronized (tasks) { + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); + + if (taskWorkItem.shutdown) { + throw new IllegalStateException("Task has been shut down!"); } - if (node.isEnableTlsPort()) { - tlsChildPort = portFinder.findUnusedPort(); + if (taskWorkItem == null) { + log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit(); + throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); } - final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort); + if (taskWorkItem.processHolder != null) { + log.makeAlert("WTF?! TaskInfo already has a processHolder") + .addData("task", task.getId()) + .emit(); + throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); + } - try { - final Closer closer = Closer.create(); - try { - if (!attemptDir.mkdirs()) { - throw new IOE("Could not create directories: %s", attemptDir); - } + final List command = new ArrayList<>(); + final String taskClasspath; + if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { + taskClasspath = Joiner.on(File.pathSeparator).join( + task.getClasspathPrefix(), + config.getClasspath() + ); + } else { + taskClasspath = config.getClasspath(); + } - final File taskFile = new File(taskDir, "task.json"); - final File statusFile = new File(attemptDir, "status.json"); - final File logFile = new File(taskDir, "log"); - final File reportsFile = new File(attemptDir, "report.json"); - - // time to adjust process holders - synchronized (tasks) { - final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId()); - - if (taskWorkItem.shutdown) { - throw new IllegalStateException("Task has been shut down!"); - } - - if (taskWorkItem == null) { - log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit(); - throw new ISE("TaskInfo disappeared for task[%s]!", task.getId()); - } - - if (taskWorkItem.processHolder != null) { - log.makeAlert("WTF?! TaskInfo already has a processHolder") - .addData("task", task.getId()) - .emit(); - throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId()); - } - - final List command = new ArrayList<>(); - final String taskClasspath; - if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { - taskClasspath = Joiner.on(File.pathSeparator).join( - task.getClasspathPrefix(), - config.getClasspath() - ); - } else { - taskClasspath = config.getClasspath(); - } - - command.add(config.getJavaCommand()); - command.add("-cp"); - command.add(taskClasspath); - - Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); - Iterables.addAll(command, config.getJavaOptsArray()); - - // Override task specific javaOpts - Object taskJavaOpts = task.getContextValue( - ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY - ); - if (taskJavaOpts != null) { - Iterables.addAll( - command, - new QuotableWhiteSpaceSplitter((String) taskJavaOpts) - ); - } - - for (String propName : props.stringPropertyNames()) { - for (String allowedPrefix : config.getAllowedPrefixes()) { - // See https://github.com/apache/incubator-druid/issues/1841 - if (propName.startsWith(allowedPrefix) - && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) - && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) - ) { - command.add( - StringUtils.format( - "-D%s=%s", - propName, - props.getProperty(propName) - ) - ); - } - } - } - - // Override child JVM specific properties - for (String propName : props.stringPropertyNames()) { - if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { - command.add( - StringUtils.format( - "-D%s=%s", - propName.substring(CHILD_PROPERTY_PREFIX.length()), - props.getProperty(propName) - ) - ); - } - } - - // Override task specific properties - final Map context = task.getContext(); - if (context != null) { - for (String propName : context.keySet()) { - if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { - command.add( - StringUtils.format( - "-D%s=%s", - propName.substring(CHILD_PROPERTY_PREFIX.length()), - task.getContextValue(propName) - ) - ); - } - } - } - - // Add dataSource, taskId and taskType for metrics or logging - command.add( - StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.DATASOURCE, - task.getDataSource() - ) - ); + command.add(config.getJavaCommand()); + command.add("-cp"); + command.add(taskClasspath); + + Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts())); + Iterables.addAll(command, config.getJavaOptsArray()); + + // Override task specific javaOpts + Object taskJavaOpts = task.getContextValue( + ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY + ); + if (taskJavaOpts != null) { + Iterables.addAll( + command, + new QuotableWhiteSpaceSplitter((String) taskJavaOpts) + ); + } + + for (String propName : props.stringPropertyNames()) { + for (String allowedPrefix : config.getAllowedPrefixes()) { + // See https://github.com/apache/incubator-druid/issues/1841 + if (propName.startsWith(allowedPrefix) + && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) + ) { command.add( StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.TASK_ID, - task.getId() - ) + "-D%s=%s", + propName, + props.getProperty(propName) + ) ); + } + } + } + + // Override child JVM specific properties + for (String propName : props.stringPropertyNames()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { + command.add( + StringUtils.format( + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + props.getProperty(propName) + ) + ); + } + } + + // Override task specific properties + final Map context = task.getContext(); + if (context != null) { + for (String propName : context.keySet()) { + if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add( StringUtils.format( - "-D%s%s=%s", - MonitorsConfig.METRIC_DIMENSION_PREFIX, - DruidMetrics.TASK_TYPE, - task.getType() - ) + "-D%s=%s", + propName.substring(CHILD_PROPERTY_PREFIX.length()), + task.getContextValue(propName) + ) ); + } + } + } - command.add(StringUtils.format("-Ddruid.host=%s", childHost)); - command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort)); - command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort)); - /** - * These are not enabled per default to allow the user to either set or not set them - * Users are highly suggested to be set in druid.indexer.runner.javaOpts - * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) - * for more information - command.add("-XX:+UseThreadPriorities"); - command.add("-XX:ThreadPriorityPolicy=42"); - */ - - command.add("org.apache.druid.cli.Main"); - command.add("internal"); - command.add("peon"); - command.add(taskFile.toString()); - command.add(statusFile.toString()); - command.add(reportsFile.toString()); - String nodeType = task.getNodeType(); - if (nodeType != null) { - command.add("--nodeType"); - command.add(nodeType); - } - - if (!taskFile.exists()) { - jsonMapper.writeValue(taskFile, task); - } - - log.info("Running command: %s", Joiner.on(" ").join(command)); - taskWorkItem.processHolder = new ProcessHolder( - new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), - logFile, - taskLocation.getHost(), - taskLocation.getPort(), - taskLocation.getTlsPort() - ); + // Add dataSource, taskId and taskType for metrics or logging + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.DATASOURCE, + task.getDataSource() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_ID, + task.getId() + ) + ); + command.add( + StringUtils.format( + "-D%s%s=%s", + MonitorsConfig.METRIC_DIMENSION_PREFIX, + DruidMetrics.TASK_TYPE, + task.getType() + ) + ); + + command.add(StringUtils.format("-Ddruid.host=%s", childHost)); + command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort)); + command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort)); + /** + * These are not enabled per default to allow the user to either set or not set them + * Users are highly suggested to be set in druid.indexer.runner.javaOpts + * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int) + * for more information + command.add("-XX:+UseThreadPriorities"); + command.add("-XX:ThreadPriorityPolicy=42"); + */ + + command.add("org.apache.druid.cli.Main"); + command.add("internal"); + command.add("peon"); + command.add(taskFile.toString()); + command.add(statusFile.toString()); + command.add(reportsFile.toString()); + String nodeType = task.getNodeType(); + if (nodeType != null) { + command.add("--nodeType"); + command.add(nodeType); + } - processHolder = taskWorkItem.processHolder; - processHolder.registerWithCloser(closer); - } + if (!taskFile.exists()) { + jsonMapper.writeValue(taskFile, task); + } - TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); - TaskRunnerUtils.notifyStatusChanged( - listeners, - task.getId(), - TaskStatus.running(task.getId()) - ); + log.info("Running command: %s", Joiner.on(" ").join(command)); + taskWorkItem.processHolder = new ProcessHolder( + new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(), + logFile, + taskLocation.getHost(), + taskLocation.getPort(), + taskLocation.getTlsPort() + ); + + processHolder = taskWorkItem.processHolder; + processHolder.registerWithCloser(closer); + } - log.info("Logging task %s output to: %s", task.getId(), logFile); - boolean runFailed = true; + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); + TaskRunnerUtils.notifyStatusChanged( + listeners, + task.getId(), + TaskStatus.running(task.getId()) + ); - final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); + log.info("Logging task %s output to: %s", task.getId(), logFile); + boolean runFailed = true; - // This will block for a while. So we append the thread information with more details - final String priorThreadName = Thread.currentThread().getName(); - Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); + final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); - try (final OutputStream toLogfile = logSink.openStream()) { - ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); - final int statusCode = processHolder.process.waitFor(); - log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); - if (statusCode == 0) { - runFailed = false; - } - } - finally { - Thread.currentThread().setName(priorThreadName); - // Upload task logs - taskLogPusher.pushTaskLog(task.getId(), logFile); - if (reportsFile.exists()) { - taskLogPusher.pushTaskReports(task.getId(), reportsFile); - } - } + // This will block for a while. So we append the thread information with more details + final String priorThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId())); - TaskStatus status; - if (!runFailed) { - // Process exited successfully - status = jsonMapper.readValue(statusFile, TaskStatus.class); - } else { - // Process exited unsuccessfully - status = TaskStatus.failure(task.getId()); - } + try (final OutputStream toLogfile = logSink.openStream()) { + ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); + final int statusCode = processHolder.process.waitFor(); + log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); + if (statusCode == 0) { + runFailed = false; + } + } + finally { + Thread.currentThread().setName(priorThreadName); + // Upload task logs + taskLogPusher.pushTaskLog(task.getId(), logFile); + if (reportsFile.exists()) { + taskLogPusher.pushTaskReports(task.getId(), reportsFile); + } + } - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); - return status; - } - catch (Throwable t) { - throw closer.rethrow(t); - } - finally { - closer.close(); - } + TaskStatus status; + if (!runFailed) { + // Process exited successfully + status = jsonMapper.readValue(statusFile, TaskStatus.class); + } else { + // Process exited unsuccessfully + status = TaskStatus.failure(task.getId()); + } + + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); + return status; + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } + } + catch (Throwable t) { + log.info(t, "Exception caught during execution"); + throw new RuntimeException(t); + } + finally { + try { + synchronized (tasks) { + final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); + if (taskWorkItem != null && taskWorkItem.processHolder != null) { + taskWorkItem.processHolder.process.destroy(); } - catch (Throwable t) { - log.info(t, "Exception caught during execution"); - throw new RuntimeException(t); + if (!stopping) { + saveRunningTasks(); } - finally { - try { - synchronized (tasks) { - final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); - if (taskWorkItem != null && taskWorkItem.processHolder != null) { - taskWorkItem.processHolder.process.destroy(); - } - if (!stopping) { - saveRunningTasks(); - } - } + } - if (node.isEnablePlaintextPort()) { - portFinder.markPortUnused(childPort); - } - if (node.isEnableTlsPort()) { - portFinder.markPortUnused(tlsChildPort); - } + if (node.isEnablePlaintextPort()) { + portFinder.markPortUnused(childPort); + } + if (node.isEnableTlsPort()) { + portFinder.markPortUnused(tlsChildPort); + } - try { - if (!stopping && taskDir.exists()) { - log.info("Removing task directory: %s", taskDir); - FileUtils.deleteDirectory(taskDir); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to delete task directory") - .addData("taskDir", taskDir.toString()) - .addData("task", task.getId()) - .emit(); - } - } - catch (Exception e) { - log.error(e, "Suppressing exception caught while cleaning up task"); - } + try { + if (!stopping && taskDir.exists()) { + log.info("Removing task directory: %s", taskDir); + FileUtils.deleteDirectory(taskDir); } } + catch (Exception e) { + log.makeAlert(e, "Failed to delete task directory") + .addData("taskDir", taskDir.toString()) + .addData("task", task.getId()) + .emit(); + } + } + catch (Exception e) { + log.error(e, "Suppressing exception caught while cleaning up task"); } - ) + } + } + } ) - ); + ) + ); saveRunningTasks(); return tasks.get(task.getId()).getResult(); } From b54c36ec9249e27795a6a908083bd1858eda12a7 Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Mon, 27 May 2019 23:52:40 +0530 Subject: [PATCH 3/8] Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate --- .idea/inspectionProfiles/Druid.xml | 5 +- .../indexer/DetermineHashedPartitionsJob.java | 2 +- .../ActionBasedUsedSegmentChecker.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 2 +- .../IngestSegmentFirehoseFactory.java | 144 +++++++++--------- .../indexing/overlord/ForkingTaskRunner.java | 61 ++++---- .../server/http/DataSourcesResource.java | 2 +- .../client/CachingClusteredClientTest.java | 2 +- .../org/apache/druid/sql/SqlLifecycle.java | 2 +- .../SpecificSegmentsQuerySegmentWalker.java | 2 +- 10 files changed, 108 insertions(+), 117 deletions(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 3b1bf13c3abd..b2d006463dc7 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -88,6 +88,7 @@ + @@ -158,7 +159,7 @@ - + @@ -402,4 +403,4 @@ - \ No newline at end of file + diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index e6648e32d261..17f51723fba0 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -307,7 +307,7 @@ protected void innerMap( .getSegmentGranularity() .bucket(DateTimes.utc(inputRow.getTimestampFromEpoch())); - hyperLogLogs.putIfAbsent(interval, HyperLogLogCollector.makeLatestCollector()); + hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector()); } else { final Optional maybeInterval = config.getGranularitySpec() .bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java index f4fec949bfde..96ce6aeae6d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java @@ -50,7 +50,8 @@ public Set findUsedSegments(Set identifiers // Group by dataSource final Map> identifiersByDataSource = new TreeMap<>(); for (SegmentIdWithShardSpec identifier : identifiers) { - identifiersByDataSource.putIfAbsent(identifier.getDataSource(), new HashSet<>()); + identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>()); + identifiersByDataSource.get(identifier.getDataSource()).add(identifier); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 47ff9c086aa4..c04a2ac8a545 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -768,7 +768,7 @@ private Map> collectIntervalsAndShardSp } if (determineNumPartitions) { - hllCollectors.putIfAbsent(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); + hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector())); List groupKey = Rows.toGroupKey( queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index b0e283fc2a5f..5309b7fc257e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -204,85 +204,83 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) segmentIds ); - try { - final List> timeLineSegments = getTimeline(); - - // Download all segments locally. - // Note: this requires enough local storage space to fit all of the segments, even though - // IngestSegmentFirehose iterates over the segments in series. We may want to change this - // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. - final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); - Map segmentFileMap = Maps.newLinkedHashMap(); - for (TimelineObjectHolder holder : timeLineSegments) { - for (PartitionChunk chunk : holder.getObject()) { - final DataSegment segment = chunk.getObject(); - segmentFileMap.putIfAbsent(segment, segmentLoader.getSegmentFiles(segment)); - } - } + final List> timeLineSegments = getTimeline(); + + // Download all segments locally. + // Note: this requires enough local storage space to fit all of the segments, even though + // IngestSegmentFirehose iterates over the segments in series. We may want to change this + // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory. + final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); + Map segmentFileMap = Maps.newLinkedHashMap(); + for (TimelineObjectHolder holder : timeLineSegments) { + for (PartitionChunk chunk : holder.getObject()) { + final DataSegment segment = chunk.getObject(); + + segmentFileMap.computeIfAbsent(segment, k -> { + try { + return segmentLoader.getSegmentFiles(segment); + } catch (SegmentLoadingException e) { + throw new RuntimeException(e); + } + }); - final List dims; - if (dimensions != null) { - dims = dimensions; - } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { - dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); - } else { - dims = getUniqueDimensions( - timeLineSegments, - inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() - ); } + } - final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + final List dims; + if (dimensions != null) { + dims = dimensions; + } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) { + dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames(); + } else { + dims = getUniqueDimensions( + timeLineSegments, + inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions() + ); + } - final List adapters = Lists.newArrayList( - Iterables.concat( - Iterables.transform( - timeLineSegments, - new Function, Iterable>() - { + final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; + + final List adapters = Lists.newArrayList( + Iterables.concat( + Iterables.transform( + timeLineSegments, + new Function, Iterable>() { + @Override + public Iterable apply(final TimelineObjectHolder holder) { + return + Iterables.transform( + holder.getObject(), + new Function, WindowedStorageAdapter>() { @Override - public Iterable apply(final TimelineObjectHolder holder) - { - return - Iterables.transform( - holder.getObject(), - new Function, WindowedStorageAdapter>() - { - @Override - public WindowedStorageAdapter apply(final PartitionChunk input) - { - final DataSegment segment = input.getObject(); - try { - return new WindowedStorageAdapter( - new QueryableIndexStorageAdapter( - indexIO.loadIndex( - Preconditions.checkNotNull( - segmentFileMap.get(segment), - "File for segment %s", segment.getId() - ) - ) - ), - holder.getInterval() - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - } - ); + public WindowedStorageAdapter apply(final PartitionChunk input) { + final DataSegment segment = input.getObject(); + try { + return new WindowedStorageAdapter( + new QueryableIndexStorageAdapter( + indexIO.loadIndex( + Preconditions.checkNotNull( + segmentFileMap.get(segment), + "File for segment %s", segment.getId() + ) + ) + ), + holder.getInterval() + ); + } catch (IOException e) { + throw new RuntimeException(e); + } } } - ) - ) - ); + ); + } + } + ) + ) + ); - final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); - return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); - } - catch (SegmentLoadingException e) { - throw new RuntimeException(e); - } + final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser); + return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter); } private long jitter(long input) @@ -506,11 +504,11 @@ static List getUniqueMetrics(List timelineHolder : Lists.reverse(timelineSegments)) { for (PartitionChunk chunk : timelineHolder.getObject()) { for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.putIfAbsent(metric, index++); + uniqueMetrics.computeIfAbsent(metric, k -> { return index[0]++; }); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index ec8db450abcf..5c6b6c2bed8e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -214,16 +214,14 @@ public void unregisterListener(String listenerId) public ListenableFuture run(final Task task) { synchronized (tasks) { - tasks.putIfAbsent( - task.getId(), + tasks.computeIfAbsent( + task.getId(), k -> new ForkingTaskRunnerWorkItem( task, exec.submit( - new Callable() - { + new Callable() { @Override - public TaskStatus call() - { + public TaskStatus call() { final String attemptUUID = UUID.randomUUID().toString(); final File taskDir = taskConfig.getTaskDir(task.getId()); final File attemptDir = new File(taskDir, attemptUUID); @@ -295,12 +293,12 @@ public TaskStatus call() // Override task specific javaOpts Object taskJavaOpts = task.getContextValue( - ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY + ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY ); if (taskJavaOpts != null) { Iterables.addAll( - command, - new QuotableWhiteSpaceSplitter((String) taskJavaOpts) + command, + new QuotableWhiteSpaceSplitter((String) taskJavaOpts) ); } @@ -308,11 +306,11 @@ public TaskStatus call() for (String allowedPrefix : config.getAllowedPrefixes()) { // See https://github.com/apache/incubator-druid/issues/1841 if (propName.startsWith(allowedPrefix) - && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) - && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) ) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName, props.getProperty(propName) @@ -326,7 +324,7 @@ public TaskStatus call() for (String propName : props.stringPropertyNames()) { if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName.substring(CHILD_PROPERTY_PREFIX.length()), props.getProperty(propName) @@ -341,7 +339,7 @@ public TaskStatus call() for (String propName : context.keySet()) { if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName.substring(CHILD_PROPERTY_PREFIX.length()), task.getContextValue(propName) @@ -353,7 +351,7 @@ public TaskStatus call() // Add dataSource, taskId and taskType for metrics or logging command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.DATASOURCE, @@ -361,7 +359,7 @@ public TaskStatus call() ) ); command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_ID, @@ -369,7 +367,7 @@ public TaskStatus call() ) ); command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_TYPE, @@ -420,9 +418,9 @@ public TaskStatus call() TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); TaskRunnerUtils.notifyStatusChanged( - listeners, - task.getId(), - TaskStatus.running(task.getId()) + listeners, + task.getId(), + TaskStatus.running(task.getId()) ); log.info("Logging task %s output to: %s", task.getId(), logFile); @@ -441,8 +439,7 @@ public TaskStatus call() if (statusCode == 0) { runFailed = false; } - } - finally { + } finally { Thread.currentThread().setName(priorThreadName); // Upload task logs taskLogPusher.pushTaskLog(task.getId(), logFile); @@ -462,19 +459,15 @@ public TaskStatus call() TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); return status; - } - catch (Throwable t) { + } catch (Throwable t) { throw closer.rethrow(t); - } - finally { + } finally { closer.close(); } - } - catch (Throwable t) { + } catch (Throwable t) { log.info(t, "Exception caught during execution"); throw new RuntimeException(t); - } - finally { + } finally { try { synchronized (tasks) { final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); @@ -498,15 +491,13 @@ public TaskStatus call() log.info("Removing task directory: %s", taskDir); FileUtils.deleteDirectory(taskDir); } - } - catch (Exception e) { + } catch (Exception e) { log.makeAlert(e, "Failed to delete task directory") .addData("taskDir", taskDir.toString()) .addData("task", task.getId()) .emit(); } - } - catch (Exception e) { + } catch (Exception e) { log.error(e, "Suppressing exception caught while cleaning up task"); } } @@ -514,7 +505,7 @@ public TaskStatus call() } ) ) - ); + ); saveRunningTasks(); return tasks.get(task.getId()).getResult(); } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 7ad4a6a80114..84f8ac0c0d53 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -549,7 +549,7 @@ private Map> getSimpleDatasource(String dataSourceNa continue; } - tierDistinctSegments.putIfAbsent(tier, new HashSet<>()); + tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>()); long dataSourceSegmentSize = 0; for (DataSegment dataSegment : druidDataSource.getSegments()) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index a9508010942c..ecc6b4fe6707 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2200,7 +2200,7 @@ private List> populateTimeline( serverExpectationList.add(serverExpectations); for (int j = 0; j < numChunks; ++j) { DruidServer lastServer = servers[random.nextInt(servers.length)]; - serverExpectations.putIfAbsent(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); + serverExpectations.computeIfAbsent(lastServer, server -> new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class))); DataSegment mockSegment = makeMock(mocks, DataSegment.class); ServerExpectation expectation = new ServerExpectation<>( diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java index db8f4645b8da..80cfd0434a47 100644 --- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java +++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java @@ -122,7 +122,7 @@ private Map contextWithSqlId(Map queryContext) if (queryContext != null) { newContext.putAll(queryContext); } - newContext.putIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString()); + newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k -> UUID.randomUUID().toString()); return newContext; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 27394d9845a4..4d1206b18b74 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -78,7 +78,7 @@ public SpecificSegmentsQuerySegmentWalker add( ) { final Segment segment = new QueryableIndexSegment(index, descriptor.getId()); - timelines.putIfAbsent(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural())); + timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural())); final VersionedIntervalTimeline timeline = timelines.get(descriptor.getDataSource()); timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment)); From 0d90c3b4693bcbb00cc47e9d645ce49a5bb9178c Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 28 May 2019 00:18:24 +0530 Subject: [PATCH 4/8] fixing checkstyle --- .../IngestSegmentFirehoseFactory.java | 19 ++++--- .../indexing/overlord/ForkingTaskRunner.java | 54 +++++++++++-------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 5309b7fc257e..2caf91dca994 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -219,7 +219,8 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) segmentFileMap.computeIfAbsent(segment, k -> { try { return segmentLoader.getSegmentFiles(segment); - } catch (SegmentLoadingException e) { + } + catch (SegmentLoadingException e) { throw new RuntimeException(e); } }); @@ -242,18 +243,20 @@ public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) final List metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics; final List adapters = Lists.newArrayList( - Iterables.concat( + Iterables.concat( Iterables.transform( timeLineSegments, new Function, Iterable>() { @Override - public Iterable apply(final TimelineObjectHolder holder) { + public Iterable apply(final TimelineObjectHolder holder) + { return Iterables.transform( holder.getObject(), new Function, WindowedStorageAdapter>() { @Override - public WindowedStorageAdapter apply(final PartitionChunk input) { + public WindowedStorageAdapter apply(final PartitionChunk input) + { final DataSegment segment = input.getObject(); try { return new WindowedStorageAdapter( @@ -267,7 +270,8 @@ public WindowedStorageAdapter apply(final PartitionChunk input) { ), holder.getInterval() ); - } catch (IOException e) { + } + catch (IOException e) { throw new RuntimeException(e); } } @@ -508,7 +512,10 @@ static List getUniqueMetrics(List timelineHolder : Lists.reverse(timelineSegments)) { for (PartitionChunk chunk : timelineHolder.getObject()) { for (String metric : chunk.getObject().getMetrics()) { - uniqueMetrics.computeIfAbsent(metric, k -> { return index[0]++; }); + uniqueMetrics.computeIfAbsent(metric, k -> { + return index[0]++; + } + ); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 5c6b6c2bed8e..5f20f2342073 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -221,7 +221,8 @@ public ListenableFuture run(final Task task) exec.submit( new Callable() { @Override - public TaskStatus call() { + public TaskStatus call() + { final String attemptUUID = UUID.randomUUID().toString(); final File taskDir = taskConfig.getTaskDir(task.getId()); final File attemptDir = new File(taskDir, attemptUUID); @@ -293,12 +294,12 @@ public TaskStatus call() { // Override task specific javaOpts Object taskJavaOpts = task.getContextValue( - ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY + ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY ); if (taskJavaOpts != null) { Iterables.addAll( - command, - new QuotableWhiteSpaceSplitter((String) taskJavaOpts) + command, + new QuotableWhiteSpaceSplitter((String) taskJavaOpts) ); } @@ -306,11 +307,11 @@ public TaskStatus call() { for (String allowedPrefix : config.getAllowedPrefixes()) { // See https://github.com/apache/incubator-druid/issues/1841 if (propName.startsWith(allowedPrefix) - && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) - && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName) + && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName) ) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName, props.getProperty(propName) @@ -324,7 +325,7 @@ public TaskStatus call() { for (String propName : props.stringPropertyNames()) { if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName.substring(CHILD_PROPERTY_PREFIX.length()), props.getProperty(propName) @@ -339,7 +340,7 @@ public TaskStatus call() { for (String propName : context.keySet()) { if (propName.startsWith(CHILD_PROPERTY_PREFIX)) { command.add( - StringUtils.format( + StringUtils.format( "-D%s=%s", propName.substring(CHILD_PROPERTY_PREFIX.length()), task.getContextValue(propName) @@ -351,7 +352,7 @@ public TaskStatus call() { // Add dataSource, taskId and taskType for metrics or logging command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.DATASOURCE, @@ -359,7 +360,7 @@ public TaskStatus call() { ) ); command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_ID, @@ -367,7 +368,7 @@ public TaskStatus call() { ) ); command.add( - StringUtils.format( + StringUtils.format( "-D%s%s=%s", MonitorsConfig.METRIC_DIMENSION_PREFIX, DruidMetrics.TASK_TYPE, @@ -418,9 +419,9 @@ public TaskStatus call() { TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation); TaskRunnerUtils.notifyStatusChanged( - listeners, - task.getId(), - TaskStatus.running(task.getId()) + listeners, + task.getId(), + TaskStatus.running(task.getId()) ); log.info("Logging task %s output to: %s", task.getId(), logFile); @@ -439,7 +440,8 @@ public TaskStatus call() { if (statusCode == 0) { runFailed = false; } - } finally { + } + finally { Thread.currentThread().setName(priorThreadName); // Upload task logs taskLogPusher.pushTaskLog(task.getId(), logFile); @@ -459,15 +461,19 @@ public TaskStatus call() { TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); return status; - } catch (Throwable t) { + } + catch (Throwable t) { throw closer.rethrow(t); - } finally { + } + finally { closer.close(); } - } catch (Throwable t) { + } + catch (Throwable t) { log.info(t, "Exception caught during execution"); throw new RuntimeException(t); - } finally { + } + finally { try { synchronized (tasks) { final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId()); @@ -491,13 +497,15 @@ public TaskStatus call() { log.info("Removing task directory: %s", taskDir); FileUtils.deleteDirectory(taskDir); } - } catch (Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to delete task directory") .addData("taskDir", taskDir.toString()) .addData("task", task.getId()) .emit(); } - } catch (Exception e) { + } + catch (Exception e) { log.error(e, "Suppressing exception caught while cleaning up task"); } } @@ -505,7 +513,7 @@ public TaskStatus call() { } ) ) - ); + ); saveRunningTasks(); return tasks.get(task.getId()).getResult(); } From 2eb5c81ee87ff60fe14ebb58e06ca3e628ff27ac Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 28 May 2019 00:30:54 +0530 Subject: [PATCH 5/8] Changing the recommendation text --- .idea/inspectionProfiles/Druid.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index b2d006463dc7..428f30e3df98 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -304,7 +304,7 @@ - + From 71a6ce011079a060127b0cdbc351140074cc673f Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 28 May 2019 00:34:10 +0530 Subject: [PATCH 6/8] Reverting auto changes made by IDE --- .idea/inspectionProfiles/Druid.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml index 428f30e3df98..ed890c86ec70 100644 --- a/.idea/inspectionProfiles/Druid.xml +++ b/.idea/inspectionProfiles/Druid.xml @@ -89,6 +89,8 @@ + From 0cee976c8538b78a28ad65918a047c7100b9deae Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 28 May 2019 16:13:30 +0530 Subject: [PATCH 7/8] Implementing recommendation: A ConcurrentHashMap on which computeIfAbsent() is called should be assigned into variables of ConcurrentHashMap type, not ConcurrentMap --- .../org/apache/druid/indexing/overlord/ForkingTaskRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 5f20f2342073..2a3cf13dc6d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -108,7 +108,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); /** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */ - private final ConcurrentMap tasks = new ConcurrentHashMap<>(); + private final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); private volatile boolean stopping = false; From e3ebf0d0849c1d0ef76137a20e0849dffcbd9baa Mon Sep 17 00:00:00 2001 From: Sashidhar Thallam Date: Tue, 28 May 2019 19:04:20 +0530 Subject: [PATCH 8/8] Removing unused import --- .../org/apache/druid/indexing/overlord/ForkingTaskRunner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 2a3cf13dc6d1..116747b9d7bc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -84,7 +84,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit;