diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index c9557fb65c77..d92b58ba5989 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -30,8 +30,10 @@ import java.util.logging.Level; import java.util.logging.Logger; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + interface CancellableRunnable extends Runnable { - public void cancel(Throwable e); + void cancel(Throwable e); } /** @@ -42,13 +44,12 @@ interface CancellableRunnable extends Runnable { final class SequentialExecutorService { private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName()); - private final SequentialExecutor manageableSequentialExecutor; - private final SequentialExecutor autoSequentialExecutor; + private final CallbackExecutor callbackExecutor; + private final AutoExecutor autoExecutor; SequentialExecutorService(Executor executor) { - this.manageableSequentialExecutor = - SequentialExecutor.newManageableSequentialExecutor(executor); - this.autoSequentialExecutor = SequentialExecutor.newAutoSequentialExecutor(executor); + this.callbackExecutor = new CallbackExecutor(executor); + this.autoExecutor = new AutoExecutor(executor); } /** @@ -56,94 +57,27 @@ final class SequentialExecutorService { * with the same key that have not been executed will be cancelled. */ ApiFuture submit(final String key, final Callable callable) { - final SettableApiFuture future = SettableApiFuture.create(); - manageableSequentialExecutor.execute( - key, - new CancellableRunnable() { - private boolean cancelled = false; - - @Override - public void run() { - if (cancelled) { - return; - } - try { - ApiFuture callResult = callable.call(); - ApiFutures.addCallback( - callResult, - new ApiFutureCallback() { - @Override - public void onSuccess(T msg) { - future.set(msg); - manageableSequentialExecutor.resume(key); - } - - @Override - public void onFailure(Throwable e) { - future.setException(e); - manageableSequentialExecutor.cancelQueuedTasks( - key, - new CancellationException( - "Execution cancelled because executing previous runnable failed.")); - } - }); - } catch (Exception e) { - future.setException(e); - } - } - - @Override - public void cancel(Throwable e) { - this.cancelled = true; - future.setException(e); - } - }); - return future; + return callbackExecutor.submit(key, callable); } /** Runs synchronous {@code Runnable} tasks sequentially. */ - void submit(final String key, final Runnable runnable) { - autoSequentialExecutor.execute(key, runnable); + void submit( String key, Runnable runnable) { + autoExecutor.execute(key, runnable); } /** - * Internal implemenation of SequentialExecutorService. Takes a serial stream of string keys and + * Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and * {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same * key will be run only when its predecessor has been completed while tasks with different keys * can be run in parallel. */ - static class SequentialExecutor { + static abstract class SequentialExecutor { // Maps keys to tasks. - private final Map> tasksByKey; - private final Executor executor; - - enum TaskCompleteAction { - EXECUTE_NEXT_TASK, - WAIT_UNTIL_RESUME, - } - - private TaskCompleteAction taskCompleteAction; + protected final Map> tasksByKey; + protected final Executor executor; - /** - * Creates a AutoSequentialExecutor which executes the next queued task automatically when the - * previous task has completed. - */ - static SequentialExecutor newAutoSequentialExecutor(Executor executor) { - return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK); - } - - /** - * Creates a ManageableSequentialExecutor which allows users to decide when to execute the next - * queued task. The first queued task is executed immediately, but the following tasks will be - * executed only when {@link #resume(String)} is called explicitly. - */ - static SequentialExecutor newManageableSequentialExecutor(Executor executor) { - return new SequentialExecutor(executor, TaskCompleteAction.WAIT_UNTIL_RESUME); - } - - private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAction) { + private SequentialExecutor(Executor executor) { this.executor = executor; - this.taskCompleteAction = taskCompleteAction; this.tasksByKey = new HashMap<>(); } @@ -162,25 +96,11 @@ void execute(final String key, Runnable task) { tasksByKey.put(key, newTasks); } - final Deque finalTasks = newTasks; - executor.execute( - new Runnable() { - @Override - public void run() { - switch (taskCompleteAction) { - case EXECUTE_NEXT_TASK: - invokeCallbackAndExecuteNext(key, finalTasks); - break; - case WAIT_UNTIL_RESUME: - invokeCallback(finalTasks); - break; - default: - // Nothing to do. - } - } - }); + execute(key, newTasks); } + protected abstract void execute(String key, Deque finalTasks); + /** Cancels every task in the queue assoicated with {@code key}. */ void cancelQueuedTasks(final String key, Throwable e) { // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, @@ -203,59 +123,127 @@ void cancelQueuedTasks(final String key, Throwable e) { } } - /** Executes the next queued task associated with {@code key}. */ - void resume(final String key) { - if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) { - // resume() is no-op since tasks are executed automatically. - return; + protected void invokeCallback(final Deque tasks) { + // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. + Runnable task = tasks.poll(); + if (task != null) { + task.run(); } - Deque tasks; + } + + protected void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { + invokeCallback(tasks); synchronized (tasksByKey) { - tasks = tasksByKey.get(key); - if (tasks == null) { - return; - } if (tasks.isEmpty()) { + // Note that there can be a race if a task is added to `tasks` at this point. However, + // tasks.add() is called only inside the block synchronized by `tasksByKey` object + // in the execute() function. Therefore, we are safe to remove `tasks` here. This is not + // optimal, but correct. tasksByKey.remove(key); return; } } - final Deque finalTasks = tasks; - // Run the next task. executor.execute( new Runnable() { @Override public void run() { - invokeCallback(finalTasks); + invokeCallbackAndExecuteNext(key, tasks); } }); } + } - private void invokeCallback(final Deque tasks) { - // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. - Runnable task = tasks.poll(); - if (task != null) { - task.run(); - } + private static class AutoExecutor extends SequentialExecutor { + AutoExecutor(Executor executor) { + super(executor); } - private void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { - invokeCallback(tasks); + protected void execute(final String key, final Deque finalTasks) { + executor.execute(new Runnable() { + @Override public void run() { + invokeCallbackAndExecuteNext(key, finalTasks); + } + }); + } + } + + private static class CallbackExecutor extends SequentialExecutor { + CallbackExecutor(Executor executor) { + super(executor); + } + + ApiFuture submit(final String key, final Callable callable) { + final SettableApiFuture future = SettableApiFuture.create(); + execute( + key, + new CancellableRunnable() { + private boolean cancelled = false; + + @Override + public void run() { + if (cancelled) { + return; + } + try { + ApiFuture callResult = callable.call(); + ApiFutures.addCallback(callResult, new ApiFutureCallback() { + @Override + public void onSuccess(T msg) { + future.set(msg); + resume(key); + } + + @Override + public void onFailure(Throwable e) { + future.setException(e); + cancelQueuedTasks( + key, + new CancellationException( + "Execution cancelled because executing previous runnable failed.")); + } + }, directExecutor()); + } catch (Exception e) { + future.setException(e); + } + } + + @Override + public void cancel(Throwable e) { + this.cancelled = true; + future.setException(e); + } + }); + return future; + } + + protected void execute(final String key, final Deque finalTasks) { + executor.execute(new Runnable() { + @Override public void run() { + invokeCallback(finalTasks); + } + }); + } + + /** Executes the next queued task associated with {@code key}. */ + void resume(final String key) { + Deque tasks; synchronized (tasksByKey) { + tasks = tasksByKey.get(key); + if (tasks == null) { + return; + } if (tasks.isEmpty()) { - // Note that there can be a race if a task is added to `tasks` at this point. However, - // tasks.add() is called only inside the block synchronized by `tasksByKey` object - // in the execute() function. Therefore, we are safe to remove `tasks` here. This is not - // optimal, but correct. tasksByKey.remove(key); return; } } + final Deque finalTasks = tasks; + // Run the next task. executor.execute( new Runnable() { @Override public void run() { - invokeCallbackAndExecuteNext(key, tasks); + invokeCallback(finalTasks); } }); }