diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index 68ea9f1a3..e356ac494 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -46,7 +46,7 @@ * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns * the scheduling of the two previously mentioned components as well as informing LeaseRenewer when LeaseTaker takes new * leases. - * + * */ public class LeaseCoordinator { @@ -80,7 +80,7 @@ public class LeaseCoordinator { protected final IMetricsFactory metricsFactory; private ScheduledExecutorService leaseCoordinatorThreadPool; - private ExecutorService leaseRenewalThreadpool; + private final ExecutorService leaseRenewalThreadpool; private volatile boolean running = false; /** @@ -206,7 +206,7 @@ public void start() throws DependencyException, InvalidStateException, Provision /** * Runs a single iteration of the lease taker - used by integration tests. - * + * * @throws InvalidStateException * @throws DependencyException */ @@ -235,7 +235,7 @@ protected void runTaker() throws DependencyException, InvalidStateException { /** * Runs a single iteration of the lease renewer - used by integration tests. - * + * * @throws InvalidStateException * @throws DependencyException */ @@ -263,7 +263,7 @@ public Collection getAssignments() { /** * @param leaseKey lease key to fetch currently held lease for - * + * * @return deep copy of currently held Lease for given key, or null if we don't hold the lease for that key */ public T getCurrentlyHeldLease(String leaseKey) { @@ -290,7 +290,6 @@ public void stop() { leaseTaker.getWorkerIdentifier())); } else { leaseCoordinatorThreadPool.shutdownNow(); - leaseRenewalThreadpool.shutdownNow(); LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop", leaseTaker.getWorkerIdentifier(), STOP_WAIT_TIME_MILLIS)); @@ -302,6 +301,7 @@ public void stop() { LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool."); } + leaseRenewalThreadpool.shutdownNow(); synchronized (shutdownLock) { leaseRenewer.clearCurrentlyHeldLeases(); running = false; @@ -317,12 +317,12 @@ public boolean isRunning() { /** * Updates application-specific lease values in DynamoDB. - * + * * @param lease lease object containing updated values * @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease - * + * * @return true if update succeeded, false otherwise - * + * * @throws InvalidStateException if lease table does not exist * @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity * @throws DependencyException if DynamoDB update fails in an unexpected way