Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
* the scheduling of the two previously mentioned components as well as informing LeaseRenewer when LeaseTaker takes new
* leases.
*
*
*/
public class LeaseCoordinator<T extends Lease> {

Expand Down Expand Up @@ -80,7 +80,7 @@ public class LeaseCoordinator<T extends Lease> {
protected final IMetricsFactory metricsFactory;

private ScheduledExecutorService leaseCoordinatorThreadPool;
private ExecutorService leaseRenewalThreadpool;
private final ExecutorService leaseRenewalThreadpool;
private volatile boolean running = false;

/**
Expand Down Expand Up @@ -206,7 +206,7 @@ public void start() throws DependencyException, InvalidStateException, Provision

/**
* Runs a single iteration of the lease taker - used by integration tests.
*
*
* @throws InvalidStateException
* @throws DependencyException
*/
Expand Down Expand Up @@ -235,7 +235,7 @@ protected void runTaker() throws DependencyException, InvalidStateException {

/**
* Runs a single iteration of the lease renewer - used by integration tests.
*
*
* @throws InvalidStateException
* @throws DependencyException
*/
Expand Down Expand Up @@ -263,7 +263,7 @@ public Collection<T> getAssignments() {

/**
* @param leaseKey lease key to fetch currently held lease for
*
*
* @return deep copy of currently held Lease for given key, or null if we don't hold the lease for that key
*/
public T getCurrentlyHeldLease(String leaseKey) {
Expand All @@ -290,7 +290,6 @@ public void stop() {
leaseTaker.getWorkerIdentifier()));
} else {
leaseCoordinatorThreadPool.shutdownNow();
leaseRenewalThreadpool.shutdownNow();
LOG.info(String.format("Worker %s stopped lease-tracking threads %dms after stop",
leaseTaker.getWorkerIdentifier(),
STOP_WAIT_TIME_MILLIS));
Expand All @@ -302,6 +301,7 @@ public void stop() {
LOG.debug("Threadpool was null, no need to shutdown/terminate threadpool.");
}

leaseRenewalThreadpool.shutdownNow();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();
running = false;
Expand All @@ -317,12 +317,12 @@ public boolean isRunning() {

/**
* Updates application-specific lease values in DynamoDB.
*
*
* @param lease lease object containing updated values
* @param concurrencyToken obtained by calling Lease.getConcurrencyToken for a currently held lease
*
*
* @return true if update succeeded, false otherwise
*
*
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB update fails due to lack of capacity
* @throws DependencyException if DynamoDB update fails in an unexpected way
Expand Down