Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
Expand Down Expand Up @@ -114,6 +115,17 @@ public long getMostRecentCheckpointTxId() throws IOException {
return rpcServer.invokeAtAvailableNs(method, long.class);
}

@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
throws IOException {
rpcServer.checkOperation(OperationCategory.READ);

RemoteMethod method =
new RemoteMethod(NamenodeProtocol.class, "getMostRecentNameNodeFileTxId",
new Class<?>[] {NNStorage.NameNodeFile.class}, nnf);
return rpcServer.invokeAtAvailableNs(method, long.class);
}

@Override
public CheckpointSignature rollEditLog() throws IOException {
rpcServer.checkOperation(OperationCategory.WRITE, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
Expand Down Expand Up @@ -1640,6 +1641,12 @@ public long getMostRecentCheckpointTxId() throws IOException {
return nnProto.getMostRecentCheckpointTxId();
}

@Override // NamenodeProtocol
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf)
throws IOException {
return nnProto.getMostRecentNameNodeFileTxId(nnf);
}

@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
return nnProto.rollEditLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
Expand All @@ -51,6 +53,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointResponseProto;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
Expand Down Expand Up @@ -141,6 +144,20 @@ public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
}

@Override
public GetMostRecentNameNodeFileTxIdResponseProto getMostRecentNameNodeFileTxId(
RpcController unused, GetMostRecentNameNodeFileTxIdRequestProto request)
throws ServiceException {
long txid;
try {
txid = impl.getMostRecentNameNodeFileTxId(
NNStorage.NameNodeFile.valueOf(request.getNameNodeFile()));
} catch (IOException e) {
throw new ServiceException(e);
}
return GetMostRecentNameNodeFileTxIdResponseProto.newBuilder().setTxId(txid).build();
}


@Override
public RollEditLogResponseProto rollEditLog(RpcController unused,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentNameNodeFileTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
Expand All @@ -46,6 +47,7 @@
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.StartCheckpointRequestProto;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
Expand Down Expand Up @@ -134,6 +136,17 @@ public long getMostRecentCheckpointTxId() throws IOException {
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId());
}

@Override
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
try {
return rpcProxy.getMostRecentNameNodeFileTxId(NULL_CONTROLLER,
GetMostRecentNameNodeFileTxIdRequestProto.newBuilder()
.setNameNodeFile(nnf.toString()).build()).getTxId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public CheckpointSignature rollEditLog() throws IOException {
return PBHelper.convert(ipc(() -> rpcProxy.rollEditLog(NULL_CONTROLLER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1562,4 +1562,25 @@ public void updateLastAppliedTxIdFromWritten() {
public long getMostRecentCheckpointTxId() {
return storage.getMostRecentCheckpointTxId();
}

/**
* @return the latest txid for the NameNodeFile type, or
* {@link HdfsServerConstants::INVALID_TXID}if there is no FSImage file of the
* type requested.
*/
public long getMostRecentNameNodeFileTxId(NameNodeFile nnf) throws IOException {
final FSImageStorageInspector inspector =
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);
try {
List<FSImageFile> images = inspector.getLatestImages();
if (images != null && !images.isEmpty()) {
return images.get(0).getCheckpointTxId();
} else {
return HdfsServerConstants.INVALID_TXID;
}
} catch (FileNotFoundException e) {
return HdfsServerConstants.INVALID_TXID;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,14 @@ public long getMostRecentCheckpointTxId() throws IOException {
namesystem.checkSuperuserPrivilege(operationName);
return namesystem.getFSImage().getMostRecentCheckpointTxId();
}

@Override // NamenodeProtocol
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException {
checkNNStartup();
namesystem.checkOperation(OperationCategory.UNCHECKED);
namesystem.checkSuperuserPrivilege();
return namesystem.getFSImage().getMostRecentNameNodeFileTxId(nnf);
}

@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ private int doRun() throws IOException {
}

// download the fsimage from active namenode
int download = downloadImage(storage, proxy, proxyInfo);
int download = downloadImage(storage, proxy, proxyInfo, isRollingUpgrade);
if (download != 0) {
return download;
}
Expand Down Expand Up @@ -351,12 +351,32 @@ private void doUpgrade(NNStorage storage) throws IOException {
}
}

private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo,
boolean isRollingUpgrade)
throws IOException {
// Load the newly formatted image, using all of the directories
// (including shared edits)
final long imageTxId = proxy.getMostRecentCheckpointTxId();
final long curTxId = proxy.getTransactionID();

if (isRollingUpgrade) {
final long rollbackTxId =
proxy.getMostRecentNameNodeFileTxId(NameNodeFile.IMAGE_ROLLBACK);
assert rollbackTxId != HdfsServerConstants.INVALID_TXID :
"Expected a valid TXID for fsimage_rollback file";
FSImage rollbackImage = new FSImage(conf);
try {
rollbackImage.getStorage().setStorageInfo(storage);
MD5Hash hash = TransferFsImage.downloadImageToStorage(
proxyInfo.getHttpAddress(), rollbackTxId, storage, true, true);
rollbackImage.saveDigestAndRenameCheckpointImage(
NameNodeFile.IMAGE_ROLLBACK, rollbackTxId, hash);
} catch (IOException ioe) {
throw ioe;
} finally {
rollbackImage.close();
}
}
FSImage image = new FSImage(conf);
try {
image.getStorage().setStorageInfo(storage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
Expand Down Expand Up @@ -111,6 +112,12 @@ BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
@Idempotent
public long getMostRecentCheckpointTxId() throws IOException;

/**
* Get the transaction ID of the most recent checkpoint for the given NameNodeFile.
*/
@Idempotent
public long getMostRecentNameNodeFileTxId(NNStorage.NameNodeFile nnf) throws IOException;

/**
* Closes the current edit log and opens a new one. The
* call fails if the file system is in SafeMode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ message GetMostRecentCheckpointTxIdResponseProto{
required uint64 txId = 1;
}

message GetMostRecentNameNodeFileTxIdRequestProto {
required string nameNodeFile = 1;
}

message GetMostRecentNameNodeFileTxIdResponseProto{
required uint64 txId = 1;
}

/**
* registration - Namenode reporting the error
* errorCode - error code indicating the error
Expand Down Expand Up @@ -253,6 +261,12 @@ service NamenodeProtocolService {
rpc getMostRecentCheckpointTxId(GetMostRecentCheckpointTxIdRequestProto)
returns(GetMostRecentCheckpointTxIdResponseProto);

/**
* Get the transaction ID of the NameNodeFile
*/
rpc getMostRecentNameNodeFileTxId(GetMostRecentNameNodeFileTxIdRequestProto)
returns(GetMostRecentNameNodeFileTxIdResponseProto);

/**
* Close the current editlog and open a new one for checkpointing purposes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,23 @@ public static void assertNNHasCheckpoints(MiniDFSCluster cluster,
}
}

public static void assertNNHasRollbackCheckpoints(MiniDFSCluster cluster,
int nnIdx, List<Integer> txids) {

for (File nameDir : getNameNodeCurrentDirs(cluster, nnIdx)) {
LOG.info("examining name dir with files: " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the logger format?

Joiner.on(",").join(nameDir.listFiles()));
// Should have fsimage_N for the three checkpoints
LOG.info("Examining storage dir " + nameDir + " with contents: "
+ StringUtils.join(nameDir.listFiles(), ", "));
for (long checkpointTxId : txids) {
File image = new File(nameDir,
NNStorage.getRollbackImageFileName(checkpointTxId));
assertTrue("Expected non-empty " + image, image.length() > 0);
}
}
}

public static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster, int nnIdx) {
List<File> nameDirs = Lists.newArrayList();
for (URI u : cluster.getNameDirs(nnIdx)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.common.HttpGetFailedException;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
Expand Down Expand Up @@ -189,7 +190,8 @@ public void testDownloadingLaterCheckpoint() throws Exception {
*/
@Test
public void testRollingUpgradeBootstrapStandby() throws Exception {
removeStandbyNameDirs();
// This node is needed to create the rollback fsimage
cluster.restartNameNode(1);

int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;

Expand All @@ -208,12 +210,21 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {

// BootstrapStandby should fail if the node has a future version
// and the cluster isn't in rolling upgrade
bs.setConf(cluster.getConfiguration(1));
bs.setConf(cluster.getConfiguration(2));
assertEquals("BootstrapStandby should return ERR_CODE_INVALID_VERSION",
ERR_CODE_INVALID_VERSION, bs.run(new String[]{"-force"}));

// Start rolling upgrade
fs.rollingUpgrade(RollingUpgradeAction.PREPARE);
RollingUpgradeInfo info = fs.rollingUpgrade(RollingUpgradeAction.QUERY);
while (!info.createdRollbackImages()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unbounded. Can you use LambdaTestUtils#wait or similar?

Thread.sleep(1000);
info = fs.rollingUpgrade(RollingUpgradeAction.QUERY);
}
// After the rollback image is created the standby is not needed
cluster.shutdownNameNode(1);
removeStandbyNameDirs();

nn0 = spy(nn0);

// Make nn0 think it is a future version
Expand All @@ -237,6 +248,9 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {

long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentCheckpointTxId();
long expectedRollbackTxId = NameNodeAdapter.getNamesystem(nn0)
.getFSImage().getMostRecentNameNodeFileTxId(
NNStorage.NameNodeFile.IMAGE_ROLLBACK);
assertEquals(11, expectedCheckpointTxId);

for (int i = 1; i < maxNNCount; i++) {
Expand All @@ -245,6 +259,8 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
bs.run(new String[]{"-force"});
FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
ImmutableList.of((int) expectedCheckpointTxId));
FSImageTestUtil.assertNNHasRollbackCheckpoints(cluster, i,
ImmutableList.of((int) expectedRollbackTxId));
}

// Make sure the bootstrap was successful
Expand All @@ -253,6 +269,13 @@ public void testRollingUpgradeBootstrapStandby() throws Exception {
// We should now be able to start the standby successfully
restartNameNodesFromIndex(1, "-rollingUpgrade", "started");

for (int i = 1; i < maxNNCount; i++) {
assertTrue("NameNodes should all have the rollback FSImage",
cluster.getNameNode(i).getFSImage().hasRollbackFSImage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to read if you extract the nn = cluster.getNameNode(i);

assertTrue("NameNodes should all be inRollingUpgrade",
cluster.getNameNode(i).getNamesystem().isRollingUpgrade());
}

// Cleanup standby dirs
for (int i = 1; i < maxNNCount; i++) {
cluster.shutdownNameNode(i);
Expand Down