diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java index 720407b8547c..a61b00be51dd 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java @@ -16,8 +16,10 @@ */ package org.apache.hadoop.ozone.om; +import java.time.Duration; import java.util.List; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; @@ -27,8 +29,8 @@ import org.apache.hadoop.hdds.utils.db.DBProfile; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; import org.apache.hadoop.ozone.TestDataUtil; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; @@ -76,11 +78,16 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.CONTAINS_SNAPSHOT; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND; import static org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED; import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE; import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -108,11 +115,8 @@ public class TestOmSnapshot { private static boolean enabledFileSystemPaths; private static boolean forceFullSnapshotDiff; private static ObjectStore store; - private static OzoneConfiguration leaderConfig; - private static OzoneManager leaderOzoneManager; - + private static OzoneManager ozoneManager; private static RDBStore rdbStore; - private static OzoneBucket ozoneBucket; @Rule @@ -121,9 +125,9 @@ public class TestOmSnapshot { @Parameterized.Parameters public static Collection data() { return Arrays.asList( - new Object[]{OBJECT_STORE, false, false}, - new Object[]{FILE_SYSTEM_OPTIMIZED, false, false}, - new Object[]{BucketLayout.LEGACY, true, true}); + new Object[]{OBJECT_STORE, false, false}, + new Object[]{FILE_SYSTEM_OPTIMIZED, false, false}, + new Object[]{BucketLayout.LEGACY, true, true}); } public TestOmSnapshot(BucketLayout newBucketLayout, @@ -158,22 +162,20 @@ private void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); String clusterId = UUID.randomUUID().toString(); String scmId = UUID.randomUUID().toString(); - conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS, - enabledFileSystemPaths); - conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, - bucketLayout.name()); - conf.setBoolean(OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, - forceFullSnapshotDiff); + String omId = UUID.randomUUID().toString(); + conf.setBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS, enabledFileSystemPaths); + conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, bucketLayout.name()); + conf.setBoolean(OZONE_OM_SNAPSHOT_FORCE_FULL_DIFF, forceFullSnapshotDiff); conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST); // Enable filesystem snapshot feature for the test regardless of the default conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); - cluster = MiniOzoneCluster.newOMHABuilder(conf) + cluster = MiniOzoneCluster.newBuilder(conf) .setClusterId(clusterId) .setScmId(scmId) - .setOMServiceId("om-service-test1") - .setNumOfOzoneManagers(3) + .setOmId(omId) .build(); + cluster.waitForClusterToBeReady(); client = cluster.newClient(); // create a volume and a bucket to be used by OzoneFileSystem @@ -181,18 +183,14 @@ private void init() throws Exception { .createVolumeAndBucket(client, bucketLayout); volumeName = ozoneBucket.getVolumeName(); bucketName = ozoneBucket.getName(); - - leaderOzoneManager = ((MiniOzoneHAClusterImpl) cluster).getOMLeader(); - leaderConfig = leaderOzoneManager.getConfiguration(); - rdbStore = - (RDBStore) leaderOzoneManager.getMetadataManager().getStore(); - cluster.setConf(leaderConfig); + ozoneManager = cluster.getOzoneManager(); + rdbStore = (RDBStore) ozoneManager.getMetadataManager().getStore(); store = client.getObjectStore(); writeClient = store.getClientProxy().getOzoneManagerClient(); KeyManagerImpl keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils - .getInternalState(leaderOzoneManager, "keyManager"); + .getInternalState(ozoneManager, "keyManager"); // stop the deletion services so that keys can still be read keyManager.stop(); @@ -846,12 +844,12 @@ private String createSnapshot(String volName, String buckName, store.createSnapshot(volName, buckName, snapshotName); String snapshotKeyPrefix = OmSnapshotManager.getSnapshotPrefix(snapshotName); - SnapshotInfo snapshotInfo = - leaderOzoneManager.getMetadataManager().getSnapshotInfoTable() - .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName)); + SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volName, buckName, snapshotName)); String snapshotDirName = - OmSnapshotManager.getSnapshotPath(leaderConfig, snapshotInfo) + - OM_KEY_PREFIX + "CURRENT"; + OmSnapshotManager.getSnapshotPath(ozoneManager.getConfiguration(), + snapshotInfo) + OM_KEY_PREFIX + "CURRENT"; GenericTestUtils .waitFor(() -> new File(snapshotDirName).exists(), 1000, 120000); return snapshotKeyPrefix; @@ -875,45 +873,6 @@ private String createFileKey(OzoneBucket bucket, String keyPrefix) return key; } - @Test - public void testUniqueSnapshotId() - throws IOException, InterruptedException, TimeoutException { - createFileKey(ozoneBucket, "key"); - - String snapshotName = UUID.randomUUID().toString(); - store.createSnapshot(volumeName, bucketName, snapshotName); - List ozoneManagers = ((MiniOzoneHAClusterImpl) cluster) - .getOzoneManagersList(); - List snapshotIds = new ArrayList<>(); - - for (OzoneManager ozoneManager : ozoneManagers) { - GenericTestUtils.waitFor( - () -> { - SnapshotInfo snapshotInfo; - try { - snapshotInfo = ozoneManager.getMetadataManager() - .getSnapshotInfoTable() - .get( - SnapshotInfo.getTableKey(volumeName, - bucketName, - snapshotName) - ); - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (snapshotInfo != null) { - snapshotIds.add(snapshotInfo.getSnapshotID()); - } - return snapshotInfo != null; - }, - 1000, - 120000); - } - - assertEquals(1, snapshotIds.stream().distinct().count()); - } - @Test public void testSnapshotOpensWithDisabledAutoCompaction() throws Exception { String snapPrefix = createSnapshot(volumeName, bucketName); @@ -929,6 +888,108 @@ public void testSnapshotOpensWithDisabledAutoCompaction() throws Exception { } } + // Test snapshot diff when OM restarts in non-HA OM env and diff job is + // in_progress when it restarts. + @Test + public void testSnapshotDiffWhenOmRestart() + throws IOException, InterruptedException { + String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5); + String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5); + createSnapshots(snapshot1, snapshot2); + + SnapshotDiffResponse response = store.snapshotDiff(volumeName, bucketName, + snapshot1, snapshot2, null, 0, false); + + assertEquals(IN_PROGRESS, response.getJobStatus()); + + // Restart the OM and wait for sometime to make sure that previous snapDiff + // job finishes. + cluster.restartOzoneManager(); + await().atMost(Duration.ofSeconds(120)). + until(() -> cluster.getOzoneManager().isRunning()); + + response = store.snapshotDiff(volumeName, bucketName, + snapshot1, snapshot2, null, 0, false); + + // If job was IN_PROGRESS or DONE state when OM restarted, it should be + // DONE by this time. + // If job FAILED during crash (which mostly happens in the test because + // of active snapshot checks), it would be removed by clean up service on + // startup, and request after clean up will be considered a new request + // and would return IN_PROGRESS. No other state is expected other than + // IN_PROGRESS and DONE. + if (response.getJobStatus() == DONE) { + assertEquals(100, response.getSnapshotDiffReport().getDiffList().size()); + } else if (response.getJobStatus() == IN_PROGRESS) { + SnapshotDiffReportOzone diffReport = + fetchReportPage(snapshot1, snapshot2, null, 0); + assertEquals(100, diffReport.getDiffList().size()); + } else { + fail("Unexpected job status for the test."); + } + } + + // Test snapshot diff when OM restarts in non-HA OM env and report is + // partially received. + @Test + public void testSnapshotDiffWhenOmRestartAndReportIsPartiallyFetched() + throws IOException, InterruptedException { + int pageSize = 10; + String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(5); + String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(5); + createSnapshots(snapshot1, snapshot2); + + SnapshotDiffReportOzone diffReport = fetchReportPage(snapshot1, snapshot2, + null, pageSize); + + List diffReportEntries = diffReport.getDiffList(); + String nextToken = diffReport.getToken(); + + // Restart the OM and no need to wait because snapDiff job finished before + // the restart. + cluster.restartOzoneManager(); + await().atMost(Duration.ofSeconds(120)). + until(() -> cluster.getOzoneManager().isRunning()); + + while (nextToken == null || StringUtils.isNotEmpty(nextToken)) { + diffReport = fetchReportPage(snapshot1, snapshot2, nextToken, pageSize); + diffReportEntries.addAll(diffReport.getDiffList()); + nextToken = diffReport.getToken(); + } + assertEquals(100, diffReportEntries.size()); + } + + private SnapshotDiffReportOzone fetchReportPage(String fromSnapshot, + String toSnapshot, + String token, + int pageSize) + throws IOException, InterruptedException { + + while (true) { + SnapshotDiffResponse response = store.snapshotDiff(volumeName, bucketName, + fromSnapshot, toSnapshot, token, pageSize, false); + if (response.getJobStatus() == IN_PROGRESS) { + Thread.sleep(response.getWaitTimeInMs()); + } else if (response.getJobStatus() == DONE) { + return response.getSnapshotDiffReport(); + } else { + fail("Unexpected job status for the test."); + } + } + } + + private void createSnapshots(String snapshot1, + String snapshot2) throws IOException { + createFileKey(ozoneBucket, "key"); + store.createSnapshot(volumeName, bucketName, snapshot1); + + for (int i = 0; i < 100; i++) { + createFileKey(ozoneBucket, "key-" + i); + } + + store.createSnapshot(volumeName, bucketName, snapshot2); + } + @Test public void testCompactionDagDisableForSnapshotMetadata() throws Exception { String snapshotName = createSnapshot(volumeName, bucketName); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java index 6b2c21d7d380..6c3e08586612 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java @@ -158,6 +158,8 @@ public static void init() throws Exception { conf.setLong( OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, SNAPSHOT_THRESHOLD); + // Enable filesystem snapshot feature for the test regardless of the default + conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true); // Some subclasses check RocksDB directly as part of their tests. These // depend on OBS layout. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java index 19dfe91ea9aa..960633c32aae 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHASnapshot.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -40,10 +41,13 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.UUID; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE; +import static org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -91,6 +95,98 @@ public static void cleanUp() { } } + // Test snapshot diff when OM restarts in HA OM env. + @Test + public void testSnapshotDiffWhenOmLeaderRestart() + throws Exception { + String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10); + String snapshot2 = "snap-" + RandomStringUtils.randomNumeric(10); + + createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10)); + store.createSnapshot(volumeName, bucketName, snapshot1); + + for (int i = 0; i < 100; i++) { + createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10)); + } + + store.createSnapshot(volumeName, bucketName, snapshot2); + + SnapshotDiffResponse response = + store.snapshotDiff(volumeName, bucketName, + snapshot1, snapshot2, null, 0, false); + + assertEquals(IN_PROGRESS, response.getJobStatus()); + + String oldLeader = cluster.getOMLeader().getOMNodeId(); + + OzoneManager omLeader = cluster.getOMLeader(); + cluster.shutdownOzoneManager(omLeader); + cluster.restartOzoneManager(omLeader, true); + + await().atMost(Duration.ofSeconds(120)) + .until(() -> cluster.getOMLeader() != null); + + String newLeader = cluster.getOMLeader().getOMNodeId(); + + if (Objects.equals(oldLeader, newLeader)) { + // If old leader becomes leader again. Job should be done by this time. + response = store.snapshotDiff(volumeName, bucketName, + snapshot1, snapshot2, null, 0, false); + assertEquals(DONE, response.getJobStatus()); + assertEquals(100, response.getSnapshotDiffReport().getDiffList().size()); + } else { + // If new leader is different from old leader. SnapDiff request will be + // new to OM, and job status should be IN_PROGRESS. + response = store.snapshotDiff(volumeName, bucketName, snapshot1, + snapshot2, null, 0, false); + assertEquals(IN_PROGRESS, response.getJobStatus()); + while (true) { + response = store.snapshotDiff(volumeName, bucketName, snapshot1, + snapshot2, null, 0, false); + if (DONE == response.getJobStatus()) { + assertEquals(100, + response.getSnapshotDiffReport().getDiffList().size()); + break; + } + Thread.sleep(response.getWaitTimeInMs()); + } + } + } + + @Test + public void testSnapshotIdConsistency() throws Exception { + createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10)); + + String snapshotName = "snap-" + RandomStringUtils.randomNumeric(10); + + store.createSnapshot(volumeName, bucketName, snapshotName); + List ozoneManagers = cluster.getOzoneManagersList(); + List snapshotIds = new ArrayList<>(); + + for (OzoneManager ozoneManager : ozoneManagers) { + await().atMost(Duration.ofSeconds(120)) + .until(() -> { + SnapshotInfo snapshotInfo; + try { + snapshotInfo = ozoneManager.getMetadataManager() + .getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volumeName, + bucketName, + snapshotName)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (snapshotInfo != null) { + snapshotIds.add(snapshotInfo.getSnapshotID()); + } + return snapshotInfo != null; + }); + } + + assertEquals(1, snapshotIds.stream().distinct().count()); + } + /** * Test snapshotNames are unique among OM nodes when snapshotName is not * passed or empty. @@ -146,8 +242,8 @@ public void testSnapshotChainManagerRestore() throws Exception { for (int i = 0; i < 100; i++) { int index = i % 10; createFileKey(ozoneBuckets.get(index), - "key-" + RandomStringUtils.randomNumeric(3)); - String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(5); + "key-" + RandomStringUtils.randomNumeric(10)); + String snapshot1 = "snapshot-" + RandomStringUtils.randomNumeric(10); store.createSnapshot(volumeNames.get(index), bucketNames.get(index), snapshot1); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 076aba15b106..5c1e94a43d70 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -831,6 +831,12 @@ private void closeColumnFamilyOptions( @Override public void close() { + if (snapshotDiffManager != null) { + snapshotDiffManager.close(); + } + if (snapshotCache != null) { + snapshotCache.invalidateAll(); + } if (snapshotDiffCleanupService != null) { snapshotDiffCleanupService.shutdown(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java index 4719d0696716..5e38ad33a280 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java @@ -152,7 +152,8 @@ public class SnapshotDiffManager implements AutoCloseable { * similar type of request at any point of time. */ private final PersistentMap snapDiffJobTable; - private final ExecutorService executorService; + private final ExecutorService snapDiffExecutor; + private ExecutorService sstDumpToolExecutor; /** * Directory to keep hardlinks of SST files for a snapDiff job temporarily. @@ -213,7 +214,7 @@ public SnapshotDiffManager(ManagedRocksDB db, byte[].class, byte[].class); - this.executorService = new ThreadPoolExecutor(threadPoolSize, + this.snapDiffExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0, TimeUnit.MILLISECONDS, @@ -264,13 +265,14 @@ private Optional initSSTDumpTool( OMConfigKeys .OZONE_OM_SNAPSHOT_SST_DUMPTOOL_EXECUTOR_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES); - ExecutorService execService = new ThreadPoolExecutor(0, + sstDumpToolExecutor = new ThreadPoolExecutor(0, threadPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder() .setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d") .build(), new ThreadPoolExecutor.DiscardPolicy()); - return Optional.of(new ManagedSSTDumpTool(execService, bufferSize)); + return Optional.of(new ManagedSSTDumpTool(sstDumpToolExecutor, + bufferSize)); } catch (NativeLibraryNotLoadedException e) { return Optional.empty(); } @@ -560,7 +562,7 @@ private synchronized SnapshotDiffResponse submitSnapDiffJob( // If executor cannot take any more job, remove the job form DB and return // the Rejected Job status with wait time. try { - executorService.execute(() -> generateSnapshotDiffReport(jobKey, jobId, + snapDiffExecutor.execute(() -> generateSnapshotDiffReport(jobKey, jobId, volumeName, bucketName, fromSnapshotName, toSnapshotName, forceFullDiff)); updateJobStatus(jobKey, QUEUED, IN_PROGRESS); @@ -1261,9 +1263,29 @@ private void loadJobsOnStartUp() { } @Override - public void close() throws Exception { + public void close() { + if (snapDiffExecutor != null) { + closeExecutorService(snapDiffExecutor, "SnapDiffExecutor"); + } + if (sstDumpToolExecutor != null) { + closeExecutorService(sstDumpToolExecutor, "SstDumpToolExecutor"); + } + } + + private void closeExecutorService(ExecutorService executorService, + String serviceName) { if (executorService != null) { - executorService.shutdown(); + LOG.info("Shutting down executorService: '{}'", serviceName); + executorService.shutdownNow(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + // Re-interrupt the thread while catching InterruptedException + Thread.currentThread().interrupt(); + executorService.shutdownNow(); + } } } }