Skip to content

Commit af5301e

Browse files
authored
HDDS-12233. Atomically import a container (apache#7934)
1 parent 2475949 commit af5301e

10 files changed

Lines changed: 186 additions & 33 deletions

File tree

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy,
128128
void update(Map<String, String> metaData, boolean forceUpdate)
129129
throws StorageContainerException;
130130

131+
void update(Map<String, String> metaData, boolean forceUpdate, String containerMetadataPath)
132+
throws StorageContainerException;
133+
131134
void updateDataScanTimestamp(Instant timestamp)
132135
throws StorageContainerException;
133136

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import java.io.InputStream;
2222
import java.io.OutputStream;
2323
import java.nio.file.Path;
24+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2425
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
26+
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
2527

2628
/**
2729
* Service to pack/unpack ContainerData container data to/from a single byte
@@ -54,4 +56,17 @@ void pack(Container<CONTAINERDATA> container, OutputStream destination)
5456
*/
5557
byte[] unpackContainerDescriptor(InputStream inputStream)
5658
throws IOException;
59+
60+
/**
61+
* Persists the custom state for a container. This method allows saving the container file to a custom location.
62+
*/
63+
default void persistCustomContainerState(Container<? extends ContainerData> container, byte[] descriptorContent,
64+
ContainerProtos.ContainerDataProto.State state, Path containerMetadataPath) throws IOException {
65+
if (descriptorContent == null) {
66+
return;
67+
}
68+
ContainerData originalContainerData = ContainerDataYaml.readContainer(descriptorContent);
69+
container.getContainerData().setState(state);
70+
container.update(originalContainerData.getMetadata(), true, containerMetadataPath.toString());
71+
}
5772
}

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -551,13 +551,17 @@ public ContainerType getContainerType() {
551551
}
552552

553553
@Override
554-
public void update(
555-
Map<String, String> metadata, boolean forceUpdate)
554+
public void update(Map<String, String> metadata, boolean forceUpdate)
556555
throws StorageContainerException {
556+
update(metadata, forceUpdate, containerData.getMetadataPath());
557+
}
557558

559+
@Override
560+
public void update(Map<String, String> metadata, boolean forceUpdate, String containerMetadataPath)
561+
throws StorageContainerException {
558562
// TODO: Now, when writing the updated data to .container file, we are
559-
// holding lock and writing data to disk. We can have async implementation
560-
// to flush the update container data to disk.
563+
// holding lock and writing data to disk. We can have async implementation
564+
// to flush the update container data to disk.
561565
long containerId = containerData.getContainerID();
562566
if (!containerData.isValid()) {
563567
LOG.debug("Invalid container data. ContainerID: {}", containerId);
@@ -577,7 +581,7 @@ public void update(
577581
containerData.addMetadata(entry.getKey(), entry.getValue());
578582
}
579583

580-
File containerFile = getContainerFile();
584+
File containerFile = getContainerFile(containerMetadataPath, containerData.getContainerID());
581585
// update the new container data to .container File
582586
updateContainerFile(containerFile);
583587
} catch (StorageContainerException ex) {
@@ -665,21 +669,22 @@ public void importContainerData(InputStream input,
665669

666670
public void importContainerData(KeyValueContainerData originalContainerData)
667671
throws IOException {
668-
containerData.setState(originalContainerData.getState());
669672
containerData
670673
.setContainerDBType(originalContainerData.getContainerDBType());
671674
containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
672675

673-
//rewriting the yaml file with new checksum calculation.
674-
update(originalContainerData.getMetadata(), true);
675-
676676
if (containerData.hasSchema(OzoneConsts.SCHEMA_V3)) {
677677
// load metadata from received dump files before we try to parse kv
678678
BlockUtils.loadKVContainerDataFromFiles(containerData, config);
679679
}
680680

681681
//fill in memory stat counter (keycount, byte usage)
682682
KeyValueContainerUtil.parseKVContainerData(containerData, config);
683+
684+
// rewriting the yaml file with new checksum calculation
685+
// restore imported container's state to the original state and flush the yaml file
686+
containerData.setState(originalContainerData.getState());
687+
update(originalContainerData.getMetadata(), true);
683688
}
684689

685690
@Override

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.commons.compress.archivers.ArchiveOutputStream;
4040
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
4141
import org.apache.commons.io.FileUtils;
42+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
4243
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
4344
import org.apache.hadoop.ozone.OzoneConsts;
4445
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -92,6 +93,13 @@ public byte[] unpackContainerData(Container<KeyValueContainerData> container,
9293
Files.createDirectories(destContainerDir);
9394
}
9495
if (FileUtils.isEmptyDirectory(destContainerDir.toFile())) {
96+
// Before the atomic move, the destination dir is empty and doesn't have a metadata directory.
97+
// Writing the .container file will fail as the metadata dir doesn't exist.
98+
// So we instead save the container file to the containerUntarDir.
99+
Path containerMetadataPath = Paths.get(container.getContainerData().getMetadataPath());
100+
Path tempContainerMetadataPath = Paths.get(containerUntarDir.toString(),
101+
containerMetadataPath.getName(containerMetadataPath.getNameCount() - 1).toString());
102+
persistCustomContainerState(container, descriptorFileContent, State.RECOVERING, tempContainerMetadataPath);
95103
Files.move(containerUntarDir, destContainerDir,
96104
StandardCopyOption.ATOMIC_MOVE,
97105
StandardCopyOption.REPLACE_EXISTING);

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,23 @@
3131
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
3232
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
3333
import org.apache.hadoop.hdds.utils.db.BatchOperation;
34+
import org.apache.hadoop.hdds.utils.db.Codec;
3435
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
3536
import org.apache.hadoop.hdds.utils.db.RDBStore;
3637
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
3738
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
3839
import org.apache.hadoop.hdds.utils.db.Table;
3940
import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
41+
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
42+
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
43+
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
44+
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
4045
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
4146
import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
4247
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
4348
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
4449
import org.rocksdb.LiveFileMetaData;
50+
import org.rocksdb.RocksDBException;
4551

4652
/**
4753
* Constructs a datanode store in accordance with schema version 3, which uses
@@ -133,16 +139,62 @@ public void dumpKVContainerData(long containerID, File dumpDir)
133139

134140
public void loadKVContainerData(File dumpDir)
135141
throws IOException {
136-
getMetadataTable().loadFromFile(
137-
getTableDumpFile(getMetadataTable(), dumpDir));
138-
getBlockDataTable().loadFromFile(
139-
getTableDumpFile(getBlockDataTable(), dumpDir));
140-
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
141-
getLastChunkInfoTable().loadFromFile(
142-
getTableDumpFile(getLastChunkInfoTable(), dumpDir));
142+
143+
try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
144+
processTable(batch, getTableDumpFile(getMetadataTable(), dumpDir),
145+
getDbDef().getMetadataColumnFamily().getKeyCodec(),
146+
getDbDef().getMetadataColumnFamily().getValueCodec(),
147+
getMetadataTable());
148+
processTable(batch, getTableDumpFile(getBlockDataTable(), dumpDir),
149+
getDbDef().getBlockDataColumnFamily().getKeyCodec(),
150+
getDbDef().getBlockDataColumnFamily().getValueCodec(),
151+
getBlockDataTable());
152+
if (VersionedDatanodeFeatures.isFinalized(HDDSLayoutFeature.HBASE_SUPPORT)) {
153+
processTable(batch, getTableDumpFile(getLastChunkInfoTable(), dumpDir),
154+
getDbDef().getLastChunkInfoColumnFamily().getKeyCodec(),
155+
getDbDef().getLastChunkInfoColumnFamily().getValueCodec(),
156+
getLastChunkInfoTable());
157+
}
158+
processTable(batch, getTableDumpFile(getDeleteTransactionTable(), dumpDir),
159+
((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getKeyCodec(),
160+
((DatanodeSchemaThreeDBDefinition)getDbDef()).getDeleteTransactionsColumnFamily().getValueCodec(),
161+
getDeleteTransactionTable());
162+
163+
getStore().commitBatchOperation(batch);
164+
} catch (RocksDBException e) {
165+
throw new IOException("Failed to load container data from dump file.", e);
166+
}
167+
}
168+
169+
private <K, V> void processTable(BatchOperation batch, File tableDumpFile,
170+
Codec<K> keyCodec, Codec<V> valueCodec, Table<K, V> table) throws IOException, RocksDBException {
171+
if (isFileEmpty(tableDumpFile)) {
172+
LOG.debug("SST File {} is empty. Skipping processing.", tableDumpFile.getAbsolutePath());
173+
return;
174+
}
175+
176+
try (ManagedOptions managedOptions = new ManagedOptions();
177+
ManagedSstFileReader sstFileReader = new ManagedSstFileReader(managedOptions)) {
178+
sstFileReader.open(tableDumpFile.getAbsolutePath());
179+
try (ManagedReadOptions managedReadOptions = new ManagedReadOptions();
180+
ManagedSstFileReaderIterator iterator =
181+
ManagedSstFileReaderIterator.managed(sstFileReader.newIterator(managedReadOptions))) {
182+
for (iterator.get().seekToFirst(); iterator.get().isValid(); iterator.get().next()) {
183+
byte[] key = iterator.get().key();
184+
byte[] value = iterator.get().value();
185+
K decodedKey = keyCodec.fromPersistedFormat(key);
186+
V decodedValue = valueCodec.fromPersistedFormat(value);
187+
table.putWithBatch(batch, decodedKey, decodedValue);
188+
}
189+
}
190+
}
191+
}
192+
193+
boolean isFileEmpty(File file) {
194+
if (!file.exists()) {
195+
return true;
143196
}
144-
getDeleteTransactionTable().loadFromFile(
145-
getTableDumpFile(getDeleteTransactionTable(), dumpDir));
197+
return file.length() == 0;
146198
}
147199

148200
public static File getTableDumpFile(Table<String, ?> table,

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,15 @@ public void verifyAndFixupContainerData(ContainerData containerData)
211211
config);
212212
if (kvContainer.getContainerState() == RECOVERING) {
213213
if (shouldDelete) {
214-
kvContainer.markContainerUnhealthy();
215-
LOG.info("Stale recovering container {} marked UNHEALTHY",
216-
kvContainerData.getContainerID());
217-
containerSet.addContainer(kvContainer);
214+
// delete Ratis replicated RECOVERING containers
215+
if (kvContainer.getContainerData().getReplicaIndex() == 0) {
216+
cleanupContainer(hddsVolume, kvContainer);
217+
} else {
218+
kvContainer.markContainerUnhealthy();
219+
LOG.info("Stale recovering container {} marked UNHEALTHY",
220+
kvContainerData.getContainerID());
221+
containerSet.addContainer(kvContainer);
222+
}
218223
}
219224
return;
220225
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,7 @@ void testAutoCompactionSmallSstFile(
882882
TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
883883
container.importContainerData(fis, packer);
884884
containerList.add(container);
885+
assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, container.getContainerData().getState());
885886
}
886887
}
887888

@@ -891,12 +892,16 @@ void testAutoCompactionSmallSstFile(
891892
CONF).getStore();
892893
List<LiveFileMetaData> fileMetaDataList1 =
893894
((RDBStore)(dnStore.getStore())).getDb().getLiveFilesMetaData();
895+
// When using Table.loadFromFile() in loadKVContainerData(),
896+
// there were as many SST files generated as the number of imported containers
897+
// After moving away from using Table.loadFromFile(), no SST files are generated unless the db is force flushed
898+
assertEquals(0, fileMetaDataList1.size());
894899
hddsVolume.compactDb();
895900
// Sleep a while to wait for compaction to complete
896901
Thread.sleep(7000);
897902
List<LiveFileMetaData> fileMetaDataList2 =
898903
((RDBStore)(dnStore.getStore())).getDb().getLiveFilesMetaData();
899-
assertThat(fileMetaDataList2.size()).isLessThan(fileMetaDataList1.size());
904+
assertThat(fileMetaDataList2).hasSizeLessThanOrEqualTo(fileMetaDataList1.size());
900905
} finally {
901906
// clean up
902907
for (KeyValueContainer c : containerList) {

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker.CONTAINER_FILE_NAME;
2424
import static org.assertj.core.api.Assertions.assertThat;
2525
import static org.junit.jupiter.api.Assertions.assertEquals;
26-
import static org.junit.jupiter.api.Assertions.assertFalse;
2726
import static org.junit.jupiter.api.Assertions.assertNotNull;
2827
import static org.junit.jupiter.api.Assertions.assertThrows;
2928
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -72,7 +71,22 @@ public class TestTarContainerPacker {
7271

7372
private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk";
7473

75-
private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
74+
private static final String TEST_DESCRIPTOR_FILE_CONTENT = "!<KeyValueContainerData>\n" +
75+
"checksum: 2215d39f2ae1de89fec837d18dc6387d8cba22fb5943cf4616f80c4b34e2edfe\n" +
76+
"chunksPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4" +
77+
"/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/chunks\n" +
78+
"containerDBType: RocksDB\n" +
79+
"containerID: 1\n" +
80+
"containerType: KeyValueContainer\n" +
81+
"layOutVersion: 2\n" +
82+
"maxSize: 5368709120\n" +
83+
"metadata: {}\n" +
84+
"metadataPath: target/test-dir/MiniOzoneClusterImpl-23c1bb30-d86a-4f79-88dc-574d8259a5b3/ozone-meta/datanode-4" +
85+
"/data-0/hdds/23c1bb30-d86a-4f79-88dc-574d8259a5b3/current/containerDir0/1/metadata\n" +
86+
"originNodeId: 25a48afa-f8d8-44ff-b268-642167e5354b\n" +
87+
"originPipelineId: d7faca81-407f-4a50-a399-bd478c9795e5\n" +
88+
"schemaVersion: '3'\n" +
89+
"state: CLOSED";
7690

7791
private TarContainerPacker packer;
7892

@@ -142,9 +156,9 @@ private KeyValueContainerData createContainer(Path dir, boolean createDir)
142156
long id = CONTAINER_ID.getAndIncrement();
143157

144158
Path containerDir = dir.resolve(String.valueOf(id));
145-
Path dbDir = containerDir.resolve("db");
146159
Path dataDir = containerDir.resolve("chunks");
147160
Path metaDir = containerDir.resolve("metadata");
161+
Path dbDir = metaDir.resolve("db");
148162
if (createDir) {
149163
Files.createDirectories(metaDir);
150164
Files.createDirectories(dbDir);
@@ -245,9 +259,10 @@ public void pack(ContainerTestVersionInfo versionInfo,
245259
assertExampleChunkFileIsGood(
246260
Paths.get(destinationContainerData.getChunksPath()),
247261
TEST_CHUNK_FILE_NAME);
248-
assertFalse(destinationContainer.getContainerFile().exists(),
249-
"Descriptor file should not have been extracted by the "
250-
+ "unpackContainerData Call");
262+
263+
String containerFileData = new String(Files.readAllBytes(destinationContainer.getContainerFile().toPath()), UTF_8);
264+
assertTrue(containerFileData.contains("RECOVERING"),
265+
"The state of the container is not 'RECOVERING' in the container file");
251266
assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
252267
inputForUnpackData.assertClosedExactlyOnce();
253268
}

hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestContainerReader.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,16 @@ public void testContainerReader(ContainerTestVersionInfo versionInfo)
237237
thread.start();
238238
thread.join();
239239

240-
//recovering container should be marked unhealthy, so the count should be 3
241-
assertEquals(UNHEALTHY, containerSet.getContainer(
242-
recoveringContainerData.getContainerID()).getContainerState());
243-
assertEquals(3, containerSet.containerCount());
240+
// Ratis replicated recovering containers are deleted upon datanode startup
241+
if (recoveringKeyValueContainer.getContainerData().getReplicaIndex() == 0) {
242+
assertNull(containerSet.getContainer(recoveringContainerData.getContainerID()));
243+
assertEquals(2, containerSet.containerCount());
244+
} else {
245+
//recovering container should be marked unhealthy, so the count should be 3
246+
assertEquals(UNHEALTHY, containerSet.getContainer(
247+
recoveringContainerData.getContainerID()).getContainerState());
248+
assertEquals(3, containerSet.containerCount());
249+
}
244250

245251
for (int i = 0; i < 2; i++) {
246252
Container keyValueContainer = containerSet.getContainer(i);

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
2727
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
2828
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
29+
import static org.apache.hadoop.ozone.container.TestHelper.isContainerClosed;
2930
import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
3031
import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
3132
import static org.apache.ozone.test.GenericTestUtils.setLogLevel;
3233
import static org.assertj.core.api.Assertions.assertThat;
3334
import static org.junit.jupiter.api.Assertions.assertEquals;
3435
import static org.junit.jupiter.api.Assertions.assertNotNull;
36+
import static org.junit.jupiter.api.Assertions.assertTrue;
3537
import static org.mockito.ArgumentMatchers.anyList;
3638
import static org.mockito.Mockito.any;
3739

@@ -282,6 +284,43 @@ private static void deleteContainer(MiniOzoneCluster cluster, DatanodeDetails dn
282284
}
283285

284286

287+
@Test
288+
public void testImportedContainerIsClosed() throws Exception {
289+
OzoneConfiguration conf = createConfiguration(false);
290+
// create a 4 node cluster
291+
try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).build()) {
292+
cluster.waitForClusterToBeReady();
293+
294+
try (OzoneClient client = OzoneClientFactory.getRpcClient(conf)) {
295+
List<DatanodeDetails> allNodes =
296+
cluster.getHddsDatanodes().stream()
297+
.map(HddsDatanodeService::getDatanodeDetails)
298+
.collect(Collectors.toList());
299+
// shutdown 4th node (node 3 is down now)
300+
cluster.shutdownHddsDatanode(allNodes.get(allNodes.size() - 1));
301+
302+
createTestData(client);
303+
final OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster);
304+
long containerID = keyLocation.getContainerID();
305+
waitForContainerClose(cluster, containerID);
306+
307+
// shutdown nodes 0 and 1. only node 2 is up now
308+
for (int i = 0; i < 2; i++) {
309+
cluster.shutdownHddsDatanode(allNodes.get(i));
310+
}
311+
waitForReplicaCount(containerID, 1, cluster);
312+
313+
// bring back up the 4th node
314+
cluster.restartHddsDatanode(allNodes.get(allNodes.size() - 1), false);
315+
316+
// the container should have been imported on the 4th node
317+
waitForReplicaCount(containerID, 2, cluster);
318+
assertTrue(isContainerClosed(cluster, containerID, allNodes.get(allNodes.size() - 1)));
319+
}
320+
}
321+
}
322+
323+
285324
@Test
286325
@Flaky("HDDS-11087")
287326
public void testECContainerReplication() throws Exception {

0 commit comments

Comments
 (0)