Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

Expand Down Expand Up @@ -285,7 +284,8 @@ public void startThreads() throws IOException {
// by calling the incrSharedCount
currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
LOG.info("Fetched initial range of seq num, from {} to {} ", currentSeqNum + 1, currentMaxSeqNum);
LOG.info("Fetched initial range of seq num, from {} to {} ",
currentSeqNum + 1, currentMaxSeqNum);
} catch (Exception e) {
throw new IOException("Could not start Sequence Counter", e);
}
Expand All @@ -305,14 +305,18 @@ public void startThreads() throws IOException {
}
try {
keyCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_MASTER_KEY_ROOT).build();
CuratorCacheListener keyCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> {
try {
processKeyAddOrUpdate(node.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator keyCacheListener " + "NODE_CREATED / NODE_CHANGED event");
throw new UncheckedIOException(e);
}
}).forDeletes(childData -> processKeyRemoved(childData.getPath())).build();
CuratorCacheListener keyCacheListener = CuratorCacheListener.builder()
.forCreatesAndChanges((oldNode, node) -> {
try {
processKeyAddOrUpdate(node.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator keyCacheListener "
+ "NODE_CREATED / NODE_CHANGED event");
throw new UncheckedIOException(e);
}
})
.forDeletes(childData -> processKeyRemoved(childData.getPath()))
.build();
keyCache.listenable().addListener(keyCacheListener);
keyCache.start();
loadFromZKCache(false);
Expand All @@ -323,21 +327,26 @@ public void startThreads() throws IOException {
LOG.info("TokenCache is enabled");
try {
tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT).build();
CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder().forCreatesAndChanges((oldNode, node) -> {
CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder()
.forCreatesAndChanges((oldNode, node) -> {
try {
processTokenAddOrUpdate(node.getData());
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener " + "NODE_CREATED / NODE_CHANGED event");
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_CREATED / NODE_CHANGED event");
throw new UncheckedIOException(e);
}
}).forDeletes(childData -> {
})
.forDeletes(childData -> {
try {
processTokenRemoved(childData);
} catch (IOException e) {
LOG.error("Error while processing Curator tokenCacheListener " + "NODE_DELETED event");
LOG.error("Error while processing Curator tokenCacheListener "
+ "NODE_DELETED event");
throw new UncheckedIOException(e);
}
}).build();
})
.build();
tokenCache.listenable().addListener(tokenCacheListener);
tokenCache.start();
loadFromZKCache(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ protected void stopServer() throws Exception {


protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
return fac.getZooKeeperServer();
return fac.getZooKeeperServer();
}

protected void tearDownAll() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -193,7 +191,7 @@ private void rebuildTokenCache(boolean initial) throws IOException {
// 2) remove tokens in local cache but not in zk anymore
for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) {
if (!localTokenCache.contains(ident)) {
currentTokens.remove(ident);
currentTokens.remove(ident);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void testMultiNodeTokenRemovalShortSyncWithoutWatch()

// This is very unlikely to happen in real case, but worth putting
// the case out
@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trunk does not have suppress here.

Copy link
Contributor Author

@melissayou melissayou Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was added by me to fix failed javac problem reported in earlier builds. There're two similar places(line135&201) and either will fail javac.

So two ways:

  1. Keep it in sync with trunk and let javac fail for this PR.
  2. (What I did here) fix it by adding suppress warning here as I'm pretty confident those types are correct. This test file is inconsistent - you can see the untouched method testMultiNodeOperationWithoutWatch() has the exact same problem and has suppress warning. So I think it's applicable to the other two methods as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. Either way is fine. Will defer to @omalley for which approach to take on here.

@Test
public void testMultiNodeTokenRemovalLongSyncWithoutWatch()
throws Exception {
Expand Down