Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.amazonaws.glue.catalog.util.ExpressionHelper;
import com.amazonaws.glue.catalog.util.MetastoreClientUtils;
import com.amazonaws.glue.catalog.util.PartitionKey;
import com.amazonaws.glue.catalog.util.ThreadExecutorFactory;
import com.amazonaws.glue.shims.AwsGlueHiveShims;
import com.amazonaws.glue.shims.ShimsLoader;
import com.amazonaws.services.glue.AWSGlue;
Expand Down Expand Up @@ -55,8 +56,8 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
Expand Down Expand Up @@ -120,6 +121,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;

import static com.amazonaws.glue.catalog.converters.ConverterUtils.stringToCatalogTable;
Expand Down Expand Up @@ -159,12 +161,11 @@ public class GlueMetastoreClientDelegate {

private static final int NUM_EXECUTOR_THREADS = 5;
static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d";
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build();
private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
NUM_EXECUTOR_THREADS,
new ThreadFactoryBuilder()
.setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build()
);
NUM_EXECUTOR_THREADS, threadFactory);

/**
* Maximum number of Glue Segments. A segment defines a non-overlapping region of a table's partitions,
Expand All @@ -179,6 +180,7 @@ public class GlueMetastoreClientDelegate {
*/
public static final int MAX_NUM_PARTITION_SEGMENTS = 10;

private final ExecutorService executorService;
private final AWSGlue glueClient;
private final HiveConf conf;
private final Warehouse wh;
Expand All @@ -200,6 +202,13 @@ public GlueMetastoreClientDelegate(HiveConf conf, AWSGlue glueClient, Warehouse
this.conf = conf;
this.glueClient = glueClient;
this.wh = wh;
ExecutorService customExecutor = ThreadExecutorFactory.getCustomThreadPool(conf, NUM_EXECUTOR_THREADS, threadFactory);
if (customExecutor != null ) {
this.executorService = customExecutor;
} else {
this.executorService = GLUE_METASTORE_DELEGATE_THREAD_POOL;
}

// TODO - May be validate catalogId confirms to AWS AccountId too.
catalogId = MetastoreClientUtils.getCatalogId(conf);
}
Expand Down Expand Up @@ -740,7 +749,7 @@ private List<Partition> batchCreatePartitions(
int j = Math.min(i + BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE, catalogPartitions.size());
final List<Partition> partitionsOnePage = catalogPartitions.subList(i, j);

batchCreatePartitionsFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<BatchCreatePartitionsHelper>() {
batchCreatePartitionsFutures.add(this.executorService.submit(new Callable<BatchCreatePartitionsHelper>() {
@Override
public BatchCreatePartitionsHelper call() throws Exception {
return new BatchCreatePartitionsHelper(glueClient, dbName, tableName, catalogId, partitionsOnePage, ifNotExists)
Expand Down Expand Up @@ -868,7 +877,7 @@ public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitionsByNames
.withTableName(tableName)
.withPartitionsToGet(batch)
.withCatalogId(catalogId);
batchGetPartitionFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<BatchGetPartitionResult>() {
batchGetPartitionFutures.add(this.executorService.submit(new Callable<BatchGetPartitionResult>() {
@Override
public BatchGetPartitionResult call() throws Exception {
return glueClient.batchGetPartition(request);
Expand Down Expand Up @@ -974,7 +983,7 @@ private List<Partition> getPartitionsParallel(
// We could convert this into a parallelStream after upgrading to JDK 8 compiler base.
List<Future<List<Partition>>> futures = Lists.newArrayList();
for (final Segment segment : segments) {
futures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<List<Partition>>() {
futures.add(this.executorService.submit(new Callable<List<Partition>>() {
@Override
public List<Partition> call() throws Exception {
return getCatalogPartitions(databaseName, tableName, expression, max, segment);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.amazonaws.glue.catalog.util;

import com.amazonaws.glue.catalog.metastore.AWSGlueClientFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.Logger;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.ExecutorService;

import java.util.concurrent.ThreadFactory;

public class ThreadExecutorFactory {
private static final Logger logger = Logger.getLogger(ThreadExecutorFactory.class);
public static final String CUSTOM_EXECUTOR_CONF = "hive.metastore.custom.executor.class";

public static ExecutorService getCustomThreadPool(HiveConf conf, int numThreads, ThreadFactory threadFactory) {
ExecutorService delegateThreadPool = null;
String customExecutorClass = conf.get(CUSTOM_EXECUTOR_CONF);
if (customExecutorClass != null && !customExecutorClass.isEmpty()){
delegateThreadPool = (ExecutorService) getInstanceByReflection(customExecutorClass, numThreads, threadFactory);
}
return delegateThreadPool;
}

public static Object getInstanceByReflection(String className, int arg1, ThreadFactory arg2) {
try {
Class[] argTypes = {Integer.class, java.util.concurrent.ThreadFactory.class};
Class classDefinition = Class.forName(className);
Constructor cons = classDefinition.getConstructor(argTypes);
Object[] args = {arg1, arg2};
return cons.newInstance(args);
} catch( ClassNotFoundException e ){
logger.warn("Exception in initializing custom executor ",e);
} catch ( NoSuchMethodException e ) {
logger.warn("Exception in initializing custom executor ", e);
} catch( InstantiationException e ) {
logger.warn("Exception in initializing custom executor ", e);
} catch (IllegalAccessException e ){
logger.warn("Exception in initializing custom executor ", e);
} catch( InvocationTargetException e ){
logger.warn("Exception in initializing custom executor ", e);
}
return null;
}
}