Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.TableNotFoundException;
Expand All @@ -27,6 +36,7 @@
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.log4j.Logger;

import datawave.core.common.logging.ThreadConfigurableLogger;
Expand All @@ -35,6 +45,7 @@
import datawave.microservice.query.Query;
import datawave.query.CloseableIterable;
import datawave.query.config.ShardQueryConfiguration;
import datawave.query.exceptions.DatawaveAsyncOperationException;
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.DatawaveQueryException;
import datawave.query.index.lookup.UidIntersector;
Expand Down Expand Up @@ -67,6 +78,10 @@ public class DatePartitionedQueryPlanner extends QueryPlanner implements Cloneab
private DefaultQueryPlanner queryPlanner;
private String initialPlan;
private String plannedScript;
/**
* The max number of concurrent planning threads
*/
private int maxConcurrentPlanningThreads = 10;

// handles boilerplate operations that surround a visitor's execution (e.g., timers, logging, validating)
private final TimedVisitorManager visitorManager = new TimedVisitorManager();
Expand Down Expand Up @@ -256,6 +271,14 @@ public UidIntersector getUidIntersector() {
return this.queryPlanner.getUidIntersector();
}

public void setMaxConcurrentPlanningThreads(int maxConcurrentPlanningThreads) {
this.maxConcurrentPlanningThreads = maxConcurrentPlanningThreads;
}

public int getMaxConcurrentPlanningThreads() {
return this.maxConcurrentPlanningThreads;
}

/**
* Not supported for {@link DatePartitionedQueryPlanner} and will result in an {@link UnsupportedOperationException}.
*
Expand Down Expand Up @@ -291,6 +314,17 @@ public CloseableIterable<QueryData> process(GenericQueryConfiguration genericCon
throws DatawaveQueryException {
visitorManager.setDebugEnabled(log.isDebugEnabled());

// start up an executor first to give the threads a chance to start
final AtomicInteger threadCounter = new AtomicInteger(1);
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "DatePartitionedQueryPlanner " + threadCounter.getAndIncrement());
}
};
ExecutorService executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
threadFactory);

// Validate the config type.
if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) {
throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName());
Expand Down Expand Up @@ -332,29 +366,48 @@ public CloseableIterable<QueryData> process(GenericQueryConfiguration genericCon
SortedMap<Pair<Date,Date>,Set<String>> dateRanges = getSubQueryDateRanges(planningConfig);

DatePartitionedQueryIterable results = new DatePartitionedQueryIterable();
List<Exception> exceptions = new ArrayList<>();
List<Throwable> exceptions = new ArrayList<>();
List<Triple<Map.Entry<Pair<Date,Date>,Set<String>>,ShardQueryConfiguration,Future<CloseableIterable<QueryData>>>> futures = new ArrayList<>();

// TODO: Lets attempt to process these pieces concurrently
for (Map.Entry<Pair<Date,Date>,Set<String>> dateRange : dateRanges.entrySet()) {
// Get the configuration with an updated query (pushed down unindexed fields)
final ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue());
futures.add(Triple.of(dateRange, configCopy, executor.submit(new Callable<CloseableIterable<QueryData>>() {
@Override
public CloseableIterable<QueryData> call() {
try {
// Create a copy of the original default query planner, and process the query with the new date range.
DefaultQueryPlanner subPlan = DatePartitionedQueryPlanner.this.queryPlanner.clone();

// Get the range stream for the new date range and query
return subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory);
} catch (Exception e) {
throw new DatawaveAsyncOperationException(e);
}
}
})));
}

for (Triple<Map.Entry<Pair<Date,Date>,Set<String>>,ShardQueryConfiguration,Future<CloseableIterable<QueryData>>> future : futures) {
Map.Entry<Pair<Date,Date>,Set<String>> dateRange = future.getLeft();
String subBeginDate = dateFormat.format(dateRange.getKey().getLeft());
String subEndDate = dateFormat.format(dateRange.getKey().getRight());

// Get the configuration with an updated query (pushed down unindexed fields)
ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue());
ShardQueryConfiguration configCopy = future.getMiddle();

try {
// Create a copy of the original default query planner, and process the query with the new date range.
DefaultQueryPlanner subPlan = this.queryPlanner.clone();

// Get the range stream for the new date range and query
results.addIterable(subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory));

results.addIterable(future.getRight().get());
if (log.isDebugEnabled()) {
log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields "
+ dateRange.getValue() + ": " + configCopy.getQueryString());
}
} catch (DatawaveQueryException e) {
} catch (ExecutionException e) {
log.warn("Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")", e);
if (e.getCause() instanceof DatawaveAsyncOperationException) {
exceptions.add(e.getCause().getCause());
} else {
exceptions.add(e.getCause());
}
} catch (InterruptedException e) {
exceptions.add(e);
} finally {
// append the new timers for logging at the end
Expand All @@ -377,7 +430,7 @@ public CloseableIterable<QueryData> process(GenericQueryConfiguration genericCon
throw (DatawaveQueryException) (exceptions.get(0));
} else {
DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation");
for (Exception reason : exceptions) {
for (Throwable reason : exceptions) {
e.addSuppressed(reason);
}
throw e;
Expand Down
Loading