Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
import org.apache.hadoop.security.AccessControlException;
Expand All @@ -50,6 +59,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to expand this or to use the full name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded this



@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract
Expand Down Expand Up @@ -96,6 +108,10 @@ private String formatTokenId(TokenIdent id) {
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
/**
* Metrics to track token management operations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a trailing .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

*/
protected DelegationTokenSecretManagerMetrics metrics;

private long keyUpdateInterval;
private long tokenMaxLifetime;
Expand Down Expand Up @@ -134,6 +150,7 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
this.tokenRenewInterval = delegationTokenRenewInterval;
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
this.storeTokenTrackingId = false;
this.metrics = DelegationTokenSecretManagerMetrics.create();
}

/** should be called before this object is used */
Expand Down Expand Up @@ -429,11 +446,18 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
long start = Time.monotonicNow();
boolean success = false;
try {
storeToken(identifier, tokenInfo);
success = true;
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
} finally {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with try finally, is that if the code throws an exception after an exception in the body, the original exception is lost, which makes it very hard to debug.

Please add the positive call into the body and the failure count in the ioexception handler. It will mean that we don't count the runtime exceptions, but I think that is ok in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've move the call to track metrics in the body and the exception handler. If the code in the exception handler throws another exception we would still lose the original exception though.

if (metrics != null) {
metrics.addStoreToken(success, Time.monotonicNow() - start);
}
}
return password;
}
Expand Down Expand Up @@ -555,7 +579,16 @@ public synchronized long renewToken(Token<TokenIdent> token,
throw new InvalidToken("Renewal request for unknown token "
+ formatTokenId(id));
}
updateToken(id, info);
long start = Time.monotonicNow();
boolean success = false;
try {
updateToken(id, info);
success = true;
} finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above, emitting metrics when there is a failure.

if (metrics != null) {
metrics.addUpdateToken(success, Time.monotonicNow() - start);
}
}
return renewTime;
}

Expand Down Expand Up @@ -591,8 +624,17 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
removeTokenForOwnerStats(id);
removeStoredToken(id);
long start = Time.monotonicNow();
boolean success = false;
try {
removeTokenForOwnerStats(id);
removeStoredToken(id);
success = true;
} finally {
if (metrics != null) {
metrics.addRemoveToken(success, Time.monotonicNow() - start);
}
}
return id;
}

Expand Down Expand Up @@ -825,4 +867,78 @@ protected void syncTokenOwnerStats() {
addTokenForOwnerStats(id);
}
}

/**
* DelegationTokenSecretManagerMetrics tracks token management operations
* and publishes them through the metrics interfaces.
*/
@Metrics(about="Delegation token secret manager metrics", context="token")
static class DelegationTokenSecretManagerMetrics implements IOStatisticsSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a DurationTrackerFactory and you can supply duration trackers to IOStatisticsBinding; note also there's a PairedDurationTrackerFactory which you could wire up the iostats store. maybe this would be the time/place to add a DurationTrackerFactory for updating hadoop metrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the DurationTrackerFactory changes on a separate PR? We're might not need to track failure durations here

private static final Logger LOG = LoggerFactory.getLogger(DelegationTokenSecretManagerMetrics.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is giving check style issues from Yetus.


final static String STORE_TOKEN_STAT = "storeToken";
final static String UPDATE_TOKEN_STAT = "updateToken";
final static String REMOVE_TOKEN_STAT = "removeToken";
final static String TOKEN_FAILURE_STAT = "tokenFailure";

final MetricsRegistry registry;
final IOStatisticsStore ioStatistics;

@Metric("Rate of storage of delegation tokens and latency (milliseconds)")
MutableRate storeToken;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is giving warnings in checkstyle but I believe is needed right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this private and added accessor methods

@Metric("Rate of update of delegation tokens and latency (milliseconds)")
MutableRate updateToken;
@Metric("Rate of removal of delegation tokens and latency (milliseconds)")
MutableRate removeToken;
@Metric("Counter of delegation tokens operation failures")
MutableCounterLong tokenFailure;

static DelegationTokenSecretManagerMetrics create() {
return DefaultMetricsSystem.instance().register(new DelegationTokenSecretManagerMetrics());
}

public DelegationTokenSecretManagerMetrics() {
ioStatistics = iostatisticsStore()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this specific case, I would skip the static import and just do:
IOStatisticsBinding.iostatisticsStore()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed static import

.withDurationTracking(STORE_TOKEN_STAT, UPDATE_TOKEN_STAT, REMOVE_TOKEN_STAT)
.withCounters(TOKEN_FAILURE_STAT)
.build();
registry = new MetricsRegistry("DelegationTokenSecretManagerMetrics");
LOG.debug("Initialized {}", registry);
}

public void addStoreToken(boolean success, long value) {
if (success) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than have three methods add the success with separate code paths for true/false. It would be nicer to have a single addFailure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added method addTokenFailure so we can call it separately

storeToken.add(value);
ioStatistics.addTimedOperation(STORE_TOKEN_STAT, value);
} else {
tokenFailure.incr();
ioStatistics.incrementCounter(TOKEN_FAILURE_STAT);
}
}

public void addUpdateToken(boolean success, long value) {
if (success) {
updateToken.add(value);
ioStatistics.addTimedOperation(UPDATE_TOKEN_STAT, value);
} else {
tokenFailure.incr();
ioStatistics.incrementCounter(TOKEN_FAILURE_STAT);
}
}

public void addRemoveToken(boolean success, long value) {
if (success) {
removeToken.add(value);
ioStatistics.addTimedOperation(REMOVE_TOKEN_STAT, value);
} else {
tokenFailure.incr();
ioStatistics.incrementCounter(TOKEN_FAILURE_STAT);
}
}

@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,42 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) {
return allKeys.get(id.getMasterKeyId());
}
}

public static class TestFailureDelegationTokenSecretManager extends TestDelegationTokenSecretManager {
private boolean throwError = false;

public TestFailureDelegationTokenSecretManager() {
super(24*60*60*1000, 10*1000, 1*1000, 3600000);
}

public void setThrowError(boolean throwError) {
this.throwError = throwError;
}

@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate) throws IOException {
if (throwError) {
throw new IOException("Test exception");
}
super.storeNewToken(ident, renewDate);
}

@Override
protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException {
if (throwError) {
throw new IOException("Test exception");
}
super.removeStoredToken(ident);
}

@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate) throws IOException {
if (throwError) {
throw new IOException("Test exception");
}
super.updateStoredToken(ident, renewDate);
}
}

public static class TokenSelector extends
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
Expand Down Expand Up @@ -579,4 +615,65 @@ public void testEmptyToken() throws IOException {
assertEquals(token1, token2);
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
}

@Test
public void testDelegationTokenSecretManagerMetrics() throws Exception {
TestDelegationTokenSecretManager dtSecretManager =
new TestDelegationTokenSecretManager(24*60*60*1000,
10*1000,1*1000,3600000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spaces after the commas

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extend the 3600000 too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

try {
dtSecretManager.startThreads();

Assert.assertEquals(0, dtSecretManager.metrics.storeToken.lastStat().numSamples());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is invoked enough it is worth factoring into its own assertion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to group the assertions and the invocation

final Token<TestDelegationTokenIdentifier> token =
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");
Assert.assertEquals(1, dtSecretManager.metrics.storeToken.lastStat().numSamples());

Assert.assertEquals(0, dtSecretManager.metrics.updateToken.lastStat().numSamples());
dtSecretManager.renewToken(token, "JobTracker");
Assert.assertEquals(1, dtSecretManager.metrics.updateToken.lastStat().numSamples());

Assert.assertEquals(0, dtSecretManager.metrics.removeToken.lastStat().numSamples());
dtSecretManager.cancelToken(token, "JobTracker");
Assert.assertEquals(1, dtSecretManager.metrics.removeToken.lastStat().numSamples());
} finally {
dtSecretManager.stopThreads();
}
}

@Test
public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
TestFailureDelegationTokenSecretManager dtSecretManager = new TestFailureDelegationTokenSecretManager();

try {
dtSecretManager.startThreads();

final Token<TestDelegationTokenIdentifier> token =
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");

dtSecretManager.setThrowError(true);

Assert.assertEquals(0, dtSecretManager.metrics.tokenFailure.value());
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");
Assert.assertEquals(1, dtSecretManager.metrics.tokenFailure.value());

try {
dtSecretManager.renewToken(token, "JobTracker");
Assert.fail("Expected exception");
} catch (Exception ex) {
// Expected exception
}
Assert.assertEquals(2, dtSecretManager.metrics.tokenFailure.value());

try {
dtSecretManager.cancelToken(token, "JobTracker");
Assert.fail("Expected exception");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use LamdaTestUtils#intercept

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very helpful. Thanks!

} catch (Exception ex) {
// Expected exception
}
Assert.assertEquals(3, dtSecretManager.metrics.tokenFailure.value());
} finally {
dtSecretManager.stopThreads();
}
}
}