diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index d1255547068d..5ba218e8f108 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -63,9 +63,8 @@ private SequentialExecutor(Executor executor) { } protected void execute(final String key, Runnable task) { - Deque newTasks; synchronized (tasksByKey) { - newTasks = tasksByKey.get(key); + Deque newTasks = tasksByKey.get(key); // If this key is already being handled, add it to the queue and return. if (newTasks != null) { newTasks.add(task); @@ -77,28 +76,32 @@ protected void execute(final String key, Runnable task) { tasksByKey.put(key, newTasks); } - callNextTaskAsync(key, newTasks); + callNextTaskAsync(key); } - protected void callNextTaskAsync(final String key, final Deque tasks) { + protected void callNextTaskAsync(final String key) { executor.execute( new Runnable() { @Override public void run() { - // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. - Runnable task = tasks.poll(); - if (task != null) { - task.run(); - postTaskExecution(key, tasks); + Deque tasks; + synchronized (tasksByKey) { + tasks = tasksByKey.get(key); + if (tasks != null && tasks.isEmpty()) { + tasksByKey.remove(key); + tasks = null; + } + } + if (tasks != null) { + // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. + Runnable task = tasks.poll(); + if (task != null) { + task.run(); + } } } }); } - - protected void postTaskExecution(String key, Deque tasks) { - // Do nothing in this class, but provide an opportunity for a subclass to do something - // interesting. - } } @BetaApi @@ -108,25 +111,13 @@ static class AutoExecutor extends SequentialExecutor { } /** Runs synchronous {@code Runnable} tasks sequentially. */ - void submit(String key, Runnable task) { - super.execute(key, task); - } - - @Override - /** Once a task is done, automatically run the next task in the queue. */ - protected void postTaskExecution(final String key, final Deque tasks) { - synchronized (tasksByKey) { - if (tasks.isEmpty()) { - // Note that there can be a race if a task is added to `tasks` at this point. However, - // tasks.add() is called only inside the block synchronized by `tasksByKey` object - // in the execute() function. Therefore, we are safe to remove `tasks` here. This is not - // optimal, but correct. - tasksByKey.remove(key); - return; + void submit(final String key, final Runnable task) { + super.execute(key, new Runnable() { + @Override public void run() { + task.run(); + callNextTaskAsync(key); } - } - - callNextTaskAsync(key, tasks); + }); } } @@ -202,7 +193,7 @@ public void run() { @Override public void onSuccess(T msg) { future.set(msg); - resume(key); + callNextTaskAsync(key); } // Step 5.2: on failure @@ -230,22 +221,6 @@ public void cancel(Throwable e) { return future; } - /** Executes the next queued task associated with {@code key}. */ - private void resume(String key) { - Deque tasks; - synchronized (tasksByKey) { - tasks = tasksByKey.get(key); - if (tasks == null) { - return; - } - if (tasks.isEmpty()) { - tasksByKey.remove(key); - return; - } - } - callNextTaskAsync(key, tasks); - } - /** Cancels every task in the queue assoicated with {@code key}. */ private void cancelQueuedTasks(final String key, Throwable e) { // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java index 7b35f1ada745..3788bd3c04f8 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java @@ -205,8 +205,8 @@ public void run() { @Test public void SequentialExecutorRunsTasksAutomatically() throws Exception { - int numKeys = 100; - int numTasks = 100; + int numKeys = 50; + int numTasks = 50; SequentialExecutorService.AutoExecutor sequentialExecutor = new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor()); CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks); @@ -223,7 +223,7 @@ public void SequentialExecutorRunsTasksAutomatically() throws Exception { for (int taskId = 0; taskId < numTasks; taskId++) { SleepingSyncTask task = new SleepingSyncTask( - taskId, 10, startedTasksSequence, completedTasksSequence, remainingTasksCount); + taskId, 5, startedTasksSequence, completedTasksSequence, remainingTasksCount); sequentialExecutor.submit(key, task); } }