Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.amazonaws.glue.catalog.metastore;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
private static final int NUM_EXECUTOR_THREADS = 5;

private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
NUM_EXECUTOR_THREADS, new ThreadFactoryBuilder()
.setNameFormat(GlueMetastoreClientDelegate.GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build()
);

@Override
public ExecutorService getExecutorService(HiveConf conf) {
return GLUE_METASTORE_DELEGATE_THREAD_POOL;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.amazonaws.glue.catalog.metastore;

import org.apache.hadoop.hive.conf.HiveConf;

import java.util.concurrent.ExecutorService;

/*
* Interface for creating an ExecutorService
*/
public interface ExecutorServiceFactory {
public ExecutorService getExecutorService(HiveConf conf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -105,6 +104,7 @@
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;

Expand All @@ -118,7 +118,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -157,14 +156,9 @@ public class GlueMetastoreClientDelegate {
*/
public static final int GET_PARTITIONS_MAX_SIZE = 1000;

private static final int NUM_EXECUTOR_THREADS = 5;
public static final String CUSTOM_EXECUTOR_FACTORY_CONF = "hive.metastore.executorservice.factory.class";

static final String GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT = "glue-metastore-delegate-%d";
private static final ExecutorService GLUE_METASTORE_DELEGATE_THREAD_POOL = Executors.newFixedThreadPool(
NUM_EXECUTOR_THREADS,
new ThreadFactoryBuilder()
.setNameFormat(GLUE_METASTORE_DELEGATE_THREADPOOL_NAME_FORMAT)
.setDaemon(true).build()
);

/**
* Maximum number of Glue Segments. A segment defines a non-overlapping region of a table's partitions,
Expand All @@ -179,6 +173,7 @@ public class GlueMetastoreClientDelegate {
*/
public static final int MAX_NUM_PARTITION_SEGMENTS = 10;

private final ExecutorService executorService;
private final AWSGlue glueClient;
private final HiveConf conf;
private final Warehouse wh;
Expand All @@ -189,6 +184,16 @@ public class GlueMetastoreClientDelegate {
public static final String CATALOG_ID_CONF = "hive.metastore.glue.catalogid";
public static final String NUM_PARTITION_SEGMENTS_CONF = "aws.glue.partition.num.segments";

protected ExecutorService getExecutorService() {
Class<? extends ExecutorServiceFactory> executorFactoryClass = this.conf
.getClass(CUSTOM_EXECUTOR_FACTORY_CONF,
DefaultExecutorServiceFactory.class).asSubclass(
ExecutorServiceFactory.class);
ExecutorServiceFactory factory = ReflectionUtils.newInstance(
executorFactoryClass, conf);
return factory.getExecutorService(conf);
}

public GlueMetastoreClientDelegate(HiveConf conf, AWSGlue glueClient, Warehouse wh) throws MetaException {
checkNotNull(conf, "Hive Config cannot be null");
checkNotNull(glueClient, "glueClient cannot be null");
Expand All @@ -200,6 +205,8 @@ public GlueMetastoreClientDelegate(HiveConf conf, AWSGlue glueClient, Warehouse
this.conf = conf;
this.glueClient = glueClient;
this.wh = wh;
this.executorService = getExecutorService();

// TODO - May be validate catalogId confirms to AWS AccountId too.
catalogId = MetastoreClientUtils.getCatalogId(conf);
}
Expand Down Expand Up @@ -740,7 +747,7 @@ private List<Partition> batchCreatePartitions(
int j = Math.min(i + BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE, catalogPartitions.size());
final List<Partition> partitionsOnePage = catalogPartitions.subList(i, j);

batchCreatePartitionsFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<BatchCreatePartitionsHelper>() {
batchCreatePartitionsFutures.add(this.executorService.submit(new Callable<BatchCreatePartitionsHelper>() {
@Override
public BatchCreatePartitionsHelper call() throws Exception {
return new BatchCreatePartitionsHelper(glueClient, dbName, tableName, catalogId, partitionsOnePage, ifNotExists)
Expand Down Expand Up @@ -868,7 +875,7 @@ public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitionsByNames
.withTableName(tableName)
.withPartitionsToGet(batch)
.withCatalogId(catalogId);
batchGetPartitionFutures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<BatchGetPartitionResult>() {
batchGetPartitionFutures.add(this.executorService.submit(new Callable<BatchGetPartitionResult>() {
@Override
public BatchGetPartitionResult call() throws Exception {
return glueClient.batchGetPartition(request);
Expand Down Expand Up @@ -974,7 +981,7 @@ private List<Partition> getPartitionsParallel(
// We could convert this into a parallelStream after upgrading to JDK 8 compiler base.
List<Future<List<Partition>>> futures = Lists.newArrayList();
for (final Segment segment : segments) {
futures.add(GLUE_METASTORE_DELEGATE_THREAD_POOL.submit(new Callable<List<Partition>>() {
futures.add(this.executorService.submit(new Callable<List<Partition>>() {
@Override
public List<Partition> call() throws Exception {
return getCatalogPartitions(databaseName, tableName, expression, max, segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.amazonaws.glue.catalog.converters.CatalogToHiveConverter;
import com.amazonaws.glue.catalog.converters.GlueInputConverter;
import com.amazonaws.glue.catalog.util.TestObjects;
import com.amazonaws.glue.catalog.util.TestExecutorServiceFactory;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.AlreadyExistsException;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
Expand Down Expand Up @@ -153,6 +154,21 @@ private void setupMockWarehouseForPath(Path path, boolean isDir, boolean mkDir)
when(wh.mkdirs(path, true)).thenReturn(mkDir);
}

// ===================== Thread Executor =====================

@Test
public void testExecutorService() throws Exception {
Object defaultExecutorService = new DefaultExecutorServiceFactory().getExecutorService(conf);
assertEquals("Default executor service should be used", metastoreClientDelegate.getExecutorService(), defaultExecutorService);
HiveConf customConf = new HiveConf();
customConf.set(GlueMetastoreClientDelegate.CATALOG_ID_CONF, CATALOG_ID);
customConf.setClass(GlueMetastoreClientDelegate.CUSTOM_EXECUTOR_FACTORY_CONF, TestExecutorServiceFactory.class, ExecutorServiceFactory.class);
GlueMetastoreClientDelegate customDelegate = new GlueMetastoreClientDelegate(customConf, mock(AWSGlue.class), mock(Warehouse.class));
Object customExecutorService = new TestExecutorServiceFactory().getExecutorService(customConf);

assertEquals("Custom executor service should be used", customDelegate.getExecutorService(), customExecutorService);
}

// ===================== Database =====================

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.amazonaws.glue.catalog.util;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class TestExecutorService extends ScheduledThreadPoolExecutor {

public TestExecutorService(int corePoolSize, ThreadFactory factory) {
super(corePoolSize, factory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazonaws.glue.catalog.util;

import com.amazonaws.glue.catalog.metastore.ExecutorServiceFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.conf.HiveConf;

import java.util.concurrent.ExecutorService;

public class TestExecutorServiceFactory implements ExecutorServiceFactory {
private static ExecutorService execService = new TestExecutorService(1, new ThreadFactoryBuilder().build());

@Override
public ExecutorService getExecutorService(HiveConf conf) {
return execService;
}
}