diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index bbc3b2e74d67..06eca64b2b00 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -115,6 +115,7 @@ private static class AutoExecutor extends SequentialExecutor { super(executor); } + @Override protected void execute(final String key, final Deque tasks) { executor.execute( new Runnable() { @@ -146,8 +147,40 @@ private static class CallbackExecutor extends SequentialExecutor { super(executor); } + /** + * This method does the following in a chain: + * + *
    + *
  1. Creates an `ApiFuture` that can be used for tracking progress. + *
  2. Creates a `CancellableRunnable` out of the `Callable` + *
  3. Adds the `CancellableRunnable` to the task queue + *
  4. Once the task is ready to be run, it will execute the `Callable` + *
  5. When the `Callable` completes one of two things happens: + *
      + *
    1. On success: + *
        + *
      1. Complete the `ApiFuture` by setting the return value. + *
      2. Call the next task. + *
      + *
    2. On Failure: + *
        + *
      1. Fail the `ApiFuture` by setting the exception. + *
      2. Cancel all tasks in the queue. + *
      + *
    + *
+ * + * @param key The key for the task queue + * @param callable The thing to run + * @param The return type for the `Callable`'s `ApiFuture`. + * @return an `ApiFuture` for tracking. + */ ApiFuture submit(final String key, final Callable> callable) { + // Step 1: create a future for the user final SettableApiFuture future = SettableApiFuture.create(); + + // Step 2: create the CancellableRunnable + // Step 3: add the task to queue via `execute` execute( key, new CancellableRunnable() { @@ -155,20 +188,25 @@ ApiFuture submit(final String key, final Callable> callable) @Override public void run() { + // the task was cancelled if (cancelled) { return; } + try { + // Step 4: call the `Callable` ApiFuture callResult = callable.call(); ApiFutures.addCallback( callResult, new ApiFutureCallback() { + // Step 5.1: on success @Override public void onSuccess(T msg) { future.set(msg); resume(key); } + // Step 5.2: on failure @Override public void onFailure(Throwable e) { future.setException(e); @@ -193,6 +231,7 @@ public void cancel(Throwable e) { return future; } + @Override protected void execute(final String key, final Deque tasks) { executor.execute( new Runnable() {