diff --git a/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/DefaultExecutorServiceFactory.java b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/DefaultExecutorServiceFactory.java new file mode 100644 index 00000000..c1643323 --- /dev/null +++ b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/DefaultExecutorServiceFactory.java @@ -0,0 +1,22 @@ +package com.amazonaws.glue.catalog.metastore; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class DefaultExecutorServiceFactory implements ExecutorServiceFactory { + private static final int NUM_EXECUTOR_THREADS = 5; + + private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool( + NUM_EXECUTOR_THREADS, new ThreadFactoryBuilder() + .setNameFormat(GlueMetastoreClientDelegate.GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT) + .setDaemon(true).build() + ); + + @Override + public ExecutorService getExecutorService(HiveConf conf) { + return GLUE_METASTORE_DELEGATE_THREAD_POOL; + } +} diff --git a/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/ExecutorServiceFactory.java b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/ExecutorServiceFactory.java new file mode 100644 index 00000000..6168a01f --- /dev/null +++ b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/ExecutorServiceFactory.java @@ -0,0 +1,12 @@ +package com.amazonaws.glue.catalog.metastore; + +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.concurrent.ExecutorService; + +/* + * Interface for creating an ExecutorService + */ +public interface ExecutorServiceFactory { + public ExecutorService getExecutorService(HiveConf conf); +} diff --git a/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegate.java b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegate.java index 31ffcab9..b3728644 100644 --- a/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegate.java +++ b/aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegate.java @@ -55,7 +55,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; @@ -105,6 +104,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -118,7 +118,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -157,14 +156,9 @@ public class GlueMetastoreClientDelegate { */ public static final int GET_PARTITIONS_MAX_SIZE = 1000; - private static final int NUM_EXECUTOR_THREADS = 5; + public static final String CUSTOM_EXECUTOR_FACTORY_CONF = "hive.metastore.executorservice.factory.class"; + static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d"; - private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool( - NUM_EXECUTOR_THREADS, - new ThreadFactoryBuilder() - .setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT) - .setDaemon(true).build() - ); /** * Maximum number of Glue Segments. A segment defines a non-overlapping region of a table's partitions, @@ -179,6 +173,7 @@ public class GlueMetastoreClientDelegate { */ public static final int MAX_NUM_PARTITION_SEGMENTS = 10; + private final ExecutorService executorService; private final AWSGlue glueClient; private final HiveConf conf; private final Warehouse wh; @@ -189,6 +184,16 @@ public class GlueMetastoreClientDelegate { public static final String CATALOG_ID_CONF = "hive.metastore.glue.catalogid"; public static final String NUM_PARTITION_SEGMENTS_CONF = "aws.glue.partition.num.segments"; + protected ExecutorService getExecutorService() { + Class executorFactoryClass = this.conf + .getClass(CUSTOM_EXECUTOR_FACTORY_CONF, + DefaultExecutorServiceFactory.class).asSubclass( + ExecutorServiceFactory.class); + ExecutorServiceFactory factory = ReflectionUtils.newInstance( + executorFactoryClass, conf); + return factory.getExecutorService(conf); + } + public GlueMetastoreClientDelegate(HiveConf conf, AWSGlue glueClient, Warehouse wh) throws MetaException { checkNotNull(conf, "Hive Config cannot be null"); checkNotNull(glueClient, "glueClient cannot be null"); @@ -200,6 +205,8 @@ public GlueMetastoreClientDelegate(HiveConf conf, AWSGlue glueClient, Warehouse this.conf = conf; this.glueClient = glueClient; this.wh = wh; + this.executorService = getExecutorService(); + // TODO - May be validate catalogId confirms to AWS AccountId too. catalogId = MetastoreClientUtils.getCatalogId(conf); } @@ -740,7 +747,7 @@ private List batchCreatePartitions( int j = Math.min(i + BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE, catalogPartitions.size()); final List partitionsOnePage = catalogPartitions.subList(i, j); - batchCreatePartitionsFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable() { + batchCreatePartitionsFutures.add(this.executorService.submit(new Callable() { @Override public BatchCreatePartitionsHelper call() throws Exception { return new BatchCreatePartitionsHelper(glueClient, dbName, tableName, catalogId, partitionsOnePage, ifNotExists) @@ -868,7 +875,7 @@ public List getPartitionsByNames .withTableName(tableName) .withPartitionsToGet(batch) .withCatalogId(catalogId); - batchGetPartitionFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable() { + batchGetPartitionFutures.add(this.executorService.submit(new Callable() { @Override public BatchGetPartitionResult call() throws Exception { return glueClient.batchGetPartition(request); @@ -974,7 +981,7 @@ private List getPartitionsParallel( // We could convert this into a parallelStream after upgrading to JDK 8 compiler base. List>> futures = Lists.newArrayList(); for (final Segment segment : segments) { - futures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable>() { + futures.add(this.executorService.submit(new Callable>() { @Override public List call() throws Exception { return getCatalogPartitions(databaseName, tableName, expression, max, segment); diff --git a/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegateTest.java b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegateTest.java index 11bdc454..8e9c1e82 100644 --- a/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegateTest.java +++ b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegateTest.java @@ -3,6 +3,7 @@ import com.amazonaws.glue.catalog.converters.CatalogToHiveConverter; import com.amazonaws.glue.catalog.converters.GlueInputConverter; import com.amazonaws.glue.catalog.util.TestObjects; +import com.amazonaws.glue.catalog.util.TestExecutorServiceFactory; import com.amazonaws.services.glue.AWSGlue; import com.amazonaws.services.glue.model.AlreadyExistsException; import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; @@ -153,6 +154,21 @@ private void setupMockWarehouseForPath(Path path, boolean isDir, boolean mkDir) when(wh.mkdirs(path, true)).thenReturn(mkDir); } + // ===================== Thread Executor ===================== + + @Test + public void testExecutorService() throws Exception { + Object defaultExecutorService = new DefaultExecutorServiceFactory().getExecutorService(conf); + assertEquals("Default executor service should be used", metastoreClientDelegate.getExecutorService(), defaultExecutorService); + HiveConf customConf = new HiveConf(); + customConf.set(GlueMetastoreClientDelegate.CATALOG_ID_CONF, CATALOG_ID); + customConf.setClass(GlueMetastoreClientDelegate.CUSTOM_EXECUTOR_FACTORY_CONF, TestExecutorServiceFactory.class, ExecutorServiceFactory.class); + GlueMetastoreClientDelegate customDelegate = new GlueMetastoreClientDelegate(customConf, mock(AWSGlue.class), mock(Warehouse.class)); + Object customExecutorService = new TestExecutorServiceFactory().getExecutorService(customConf); + + assertEquals("Custom executor service should be used", customDelegate.getExecutorService(), customExecutorService); + } + // ===================== Database ===================== @Test diff --git a/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorService.java b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorService.java new file mode 100644 index 00000000..d6f8f28b --- /dev/null +++ b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorService.java @@ -0,0 +1,11 @@ +package com.amazonaws.glue.catalog.util; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +public class TestExecutorService extends ScheduledThreadPoolExecutor { + + public TestExecutorService(int corePoolSize, ThreadFactory factory) { + super(corePoolSize, factory); + } +} \ No newline at end of file diff --git a/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorServiceFactory.java b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorServiceFactory.java new file mode 100644 index 00000000..c55aba8d --- /dev/null +++ b/aws-glue-datacatalog-client-common/src/test/java/com/amazonaws/glue/catalog/util/TestExecutorServiceFactory.java @@ -0,0 +1,16 @@ +package com.amazonaws.glue.catalog.util; + +import com.amazonaws.glue.catalog.metastore.ExecutorServiceFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.concurrent.ExecutorService; + +public class TestExecutorServiceFactory implements ExecutorServiceFactory { + private static ExecutorService execService = new TestExecutorService(1, new ThreadFactoryBuilder().build()); + + @Override + public ExecutorService getExecutorService(HiveConf conf) { + return execService; + } +}