-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-3566]add thread factory in BoundedInMemoryExecutor #4926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
b0d5367
04e07e3
0a55c4a
fe90d2d
bfd1ab9
f9b4b7b
972b38a
e8f3098
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.common.util; | ||
|
|
||
| import org.jetbrains.annotations.NotNull; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
| * A factory to named thread | ||
| */ | ||
| public class CustomizedThreadFactory implements ThreadFactory { | ||
|
|
||
| private static final AtomicLong POOL_NUM = new AtomicLong(1); | ||
| private final AtomicLong threadNum = new AtomicLong(1); | ||
|
|
||
| private final String threadName; | ||
| private final boolean daemon; | ||
|
|
||
| public CustomizedThreadFactory() { | ||
| this("pool-" + POOL_NUM.getAndIncrement(), false); | ||
| } | ||
|
|
||
| public CustomizedThreadFactory(String threadNamePrefix) { | ||
| this(threadNamePrefix, false); | ||
| } | ||
|
|
||
| public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) { | ||
| this.threadName = threadNamePrefix + "-thread-"; | ||
| this.daemon = daemon; | ||
| } | ||
|
|
||
| @Override | ||
| public Thread newThread(@NotNull Runnable r) { | ||
| Thread runThread = new Thread(r); | ||
| runThread.setDaemon(daemon); | ||
| runThread.setName(threadName + threadNum.getAndIncrement()); | ||
| return runThread; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| package org.apache.hudi.common.util.queue; | ||
|
|
||
| import org.apache.hudi.common.util.CustomizedThreadFactory; | ||
| import org.apache.hudi.common.util.DefaultSizeEstimator; | ||
| import org.apache.hudi.common.util.Functions; | ||
| import org.apache.hudi.common.util.Option; | ||
|
|
@@ -48,8 +49,10 @@ public class BoundedInMemoryExecutor<I, O, E> { | |
|
|
||
| private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class); | ||
|
|
||
| // Executor service used for launching writer thread. | ||
| private final ExecutorService executorService; | ||
| // Executor service used for launching read thread. | ||
|
||
| private final ExecutorService producerExecutorService; | ||
| // Executor service used for launching write thread. | ||
| private final ExecutorService consumerExecutorService; | ||
| // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. | ||
| private final BoundedInMemoryQueue<I, O> queue; | ||
| // Producers | ||
|
|
@@ -60,28 +63,30 @@ public class BoundedInMemoryExecutor<I, O, E> { | |
| private final Runnable preExecuteRunnable; | ||
|
|
||
| public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator<I> inputItr, | ||
| BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) { | ||
| BoundedInMemoryQueueConsumer<O, E> consumer, Function<I, O> transformFunction, Runnable preExecuteRunnable) { | ||
| this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); | ||
| } | ||
|
|
||
| public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer, | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) { | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) { | ||
| this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); | ||
| } | ||
|
|
||
| public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer, | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) { | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, Runnable preExecuteRunnable) { | ||
| this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); | ||
| } | ||
|
|
||
| public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers, | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, | ||
| final SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) { | ||
| Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction, | ||
| final SizeEstimator<O> sizeEstimator, Runnable preExecuteRunnable) { | ||
| this.producers = producers; | ||
| this.consumer = consumer; | ||
| this.preExecuteRunnable = preExecuteRunnable; | ||
| // Ensure single thread for each producer thread and one for consumer | ||
| this.executorService = Executors.newFixedThreadPool(producers.size() + 1); | ||
| // Ensure fixed thread for each producer thread | ||
| this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer")); | ||
| // Ensure single thread for consumer | ||
| this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer")); | ||
| this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator); | ||
| } | ||
|
|
||
|
|
@@ -92,7 +97,7 @@ public ExecutorCompletionService<Boolean> startProducers() { | |
| // Latch to control when and which producer thread will close the queue | ||
| final CountDownLatch latch = new CountDownLatch(producers.size()); | ||
| final ExecutorCompletionService<Boolean> completionService = | ||
| new ExecutorCompletionService<Boolean>(executorService); | ||
| new ExecutorCompletionService<Boolean>(producerExecutorService); | ||
| producers.stream().map(producer -> { | ||
| return completionService.submit(() -> { | ||
| try { | ||
|
|
@@ -122,7 +127,7 @@ public ExecutorCompletionService<Boolean> startProducers() { | |
| */ | ||
| private Future<E> startConsumer() { | ||
| return consumer.map(consumer -> { | ||
| return executorService.submit(() -> { | ||
| return consumerExecutorService.submit(() -> { | ||
| LOG.info("starting consumer thread"); | ||
| preExecuteRunnable.run(); | ||
| try { | ||
|
|
@@ -143,7 +148,7 @@ private Future<E> startConsumer() { | |
| */ | ||
| public E execute() { | ||
| try { | ||
| ExecutorCompletionService<Boolean> producerService = startProducers(); | ||
| startProducers(); | ||
| Future<E> future = startConsumer(); | ||
| // Wait for consumer to be done | ||
| return future.get(); | ||
|
|
@@ -161,7 +166,8 @@ public boolean isRemaining() { | |
| } | ||
|
|
||
| public void shutdownNow() { | ||
| executorService.shutdownNow(); | ||
| producerExecutorService.shutdownNow(); | ||
| consumerExecutorService.shutdownNow(); | ||
| } | ||
|
|
||
| public BoundedInMemoryQueue<I, O> getQueue() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.common.util; | ||
|
|
||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.Test; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.locks.LockSupport; | ||
|
|
||
| class TestCustomizedThreadFactory { | ||
|
|
||
| @Test | ||
| public void threadPrefixTest() throws ExecutionException, InterruptedException { | ||
| int nThreads = 100; | ||
| String threadNamePrefix = "consumer"; | ||
| ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix)); | ||
| for (int i = 0; i < nThreads; i++) { | ||
| Future<Boolean> resultFuture = executorService.submit(() -> { | ||
| LockSupport.parkNanos(10000000L); | ||
| String name = Thread.currentThread().getName(); | ||
| return name.startsWith(threadNamePrefix); | ||
| }); | ||
| Boolean result = resultFuture.get(); | ||
| Assertions.assertTrue(result); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void defaultThreadPrefixTest() throws ExecutionException, InterruptedException { | ||
|
||
| int nThreads = 100; | ||
| String defaultThreadNamePrefix = "pool-1"; | ||
| ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory()); | ||
| for (int i = 0; i < nThreads; i++) { | ||
| Future<Boolean> resultFuture = executorService.submit(() -> { | ||
| LockSupport.parkNanos(10000000L); | ||
| String name = Thread.currentThread().getName(); | ||
| return name.startsWith(defaultThreadNamePrefix); | ||
| }); | ||
| Boolean result = resultFuture.get(); | ||
| Assertions.assertTrue(result); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| public void daemonThreadTest() throws ExecutionException, InterruptedException { | ||
|
||
| int nThreads = 100; | ||
| String threadNamePrefix = "consumer"; | ||
| ExecutorService executorService = Executors.newFixedThreadPool(nThreads, new CustomizedThreadFactory(threadNamePrefix, true)); | ||
| for (int i = 0; i < nThreads; i++) { | ||
| Future<Boolean> resultFuture = executorService.submit(() -> { | ||
| LockSupport.parkNanos(10000000L); | ||
| String name = Thread.currentThread().getName(); | ||
| boolean daemon = Thread.currentThread().isDaemon(); | ||
| return name.startsWith(threadNamePrefix) && daemon; | ||
| }); | ||
| Boolean result = resultFuture.get(); | ||
| Assertions.assertTrue(result); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I am a little confused about the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about "A thread factory for custom creation of threads" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scxwhite A thread factory for creation of threads should be ok, and would you please create a jira ticket and change the PR title accordingly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leesf done.