Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util;

import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* A thread factory for creation of threads
*/
public class CustomizedThreadFactory implements ThreadFactory {

private static final AtomicLong POOL_NUM = new AtomicLong(1);
private final AtomicLong threadNum = new AtomicLong(1);

private final String threadName;
private final boolean daemon;

public CustomizedThreadFactory() {
this("pool-" + POOL_NUM.getAndIncrement(), false);
}

public CustomizedThreadFactory(String threadNamePrefix) {
this(threadNamePrefix, false);
}

public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) {
this.threadName = threadNamePrefix + "-thread-";
this.daemon = daemon;
}

@Override
public Thread newThread(@NotNull Runnable r) {
Thread runThread = new Thread(r);
runThread.setDaemon(daemon);
runThread.setName(threadName + threadNum.getAndIncrement());
return runThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.util.queue;

import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -48,8 +49,10 @@ public class BoundedInMemoryExecutor<I, O, E> {

private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);

// Executor service used for launching writer thread.
private final ExecutorService executorService;
// Executor service used for launching write thread.
private final ExecutorService producerExecutorService;
// Executor service used for launching read thread.
private final ExecutorService consumerExecutorService;
// Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
private final BoundedInMemoryQueue<I, O> queue;
// Producers
Expand All @@ -60,28 +63,30 @@ public class BoundedInMemoryExecutor<I, O, E> {
private final Runnable preExecuteRunnable;

public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator<I> inputItr,
BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable);
}

public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop());
}

public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) {
this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable);
}

public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) {
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) {
this.producers = producers;
this.consumer = consumer;
this.preExecuteRunnable = preExecuteRunnable;
// Ensure single thread for each producer thread and one for consumer
this.executorService = Executors.newFixedThreadPool(producers.size() + 1);
// Ensure fixed thread for each producer thread
this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer"));
// Ensure single thread for consumer
this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer"));
this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator);
}

Expand All @@ -92,7 +97,7 @@ public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(executorService);
new ExecutorCompletionService<Boolean>(producerExecutorService);
producers.stream().map(producer -> {
return completionService.submit(() -> {
try {
Expand Down Expand Up @@ -122,7 +127,7 @@ public ExecutorCompletionService<Boolean> startProducers() {
*/
private Future<E> startConsumer() {
return consumer.map(consumer -> {
return executorService.submit(() -> {
return consumerExecutorService.submit(() -> {
LOG.info("starting consumer thread");
preExecuteRunnable.run();
try {
Expand All @@ -143,7 +148,7 @@ private Future<E> startConsumer() {
*/
public E execute() {
try {
ExecutorCompletionService<Boolean> producerService = startProducers();
startProducers();
Future<E> future = startConsumer();
// Wait for consumer to be done
return future.get();
Expand All @@ -161,7 +166,8 @@ public boolean isRemaining() {
}

public void shutdownNow() {
executorService.shutdownNow();
producerExecutorService.shutdownNow();
consumerExecutorService.shutdownNow();
}

public BoundedInMemoryQueue<I, O> getQueue() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.LockSupport;

public class TestCustomizedThreadFactory {

@Test
public void testThreadPrefix() throws ExecutionException, InterruptedException {
int nThreads = 100;
String threadNamePrefix = "consumer";
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix));
for (int i = 0; i < nThreads; i++) {
Future<Boolean> resultFuture = executorService.submit(() -> {
LockSupport.parkNanos(10000000L);
String name = Thread.currentThread().getName();
return name.startsWith(threadNamePrefix);
});
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
}

@Test
public void testDefaultThreadPrefix() throws ExecutionException, InterruptedException {
int nThreads = 100;
String defaultThreadNamePrefix = "pool-1";
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory());
for (int i = 0; i < nThreads; i++) {
Future<Boolean> resultFuture = executorService.submit(() -> {
LockSupport.parkNanos(10000000L);
String name = Thread.currentThread().getName();
return name.startsWith(defaultThreadNamePrefix);
});
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
}

@Test
public void testDaemonThread() throws ExecutionException, InterruptedException {
int nThreads = 100;
String threadNamePrefix = "consumer";
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix, true));
for (int i = 0; i < nThreads; i++) {
Future<Boolean> resultFuture = executorService.submit(() -> {
LockSupport.parkNanos(10000000L);
String name = Thread.currentThread().getName();
boolean daemon = Thread.currentThread().isDaemon();
return name.startsWith(threadNamePrefix) && daemon;
});
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
}
}