Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,16 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions,
"db path :{}", dbJmxBeanName);
}

//create checkpoints directory if not exists.
// Create checkpoints and snapshot directories if not exists.
if (!createCheckpointDirs) {
checkpointsParentDir = null;
snapshotsParentDir = null;
} else {
Path checkpointsParentDirPath =
Paths.get(dbLocation.getParent(), OM_CHECKPOINT_DIR);
checkpointsParentDir = checkpointsParentDirPath.toString();
Files.createDirectories(checkpointsParentDirPath);
}
//create snapshot checkpoint directory if does not exist.
if (!createCheckpointDirs) {
snapshotsParentDir = null;
} else {

Path snapshotsParentDirPath =
Paths.get(dbLocation.getParent(), OM_SNAPSHOT_CHECKPOINT_DIR);
snapshotsParentDir = snapshotsParentDirPath.toString();
Expand All @@ -162,7 +159,9 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions,
checkPointManager = new RDBCheckpointManager(db, dbLocation.getName());
rdbMetrics = RDBMetrics.create();

} catch (IOException | RocksDBException e) {
} catch (Exception e) {
// Close DB and other things if got initialized.
close();
String msg = "Failed init RocksDB, db path : " + dbFile.getAbsolutePath()
+ ", " + "exception :" + (e.getCause() == null ?
e.getClass().getCanonicalName() + " " + e.getMessage() :
Expand Down Expand Up @@ -204,9 +203,9 @@ public void close() throws IOException {
}

RDBMetrics.unRegister();
checkPointManager.close();
IOUtils.closeQuietly(checkPointManager);
IOUtils.closeQuietly(rocksDBCheckpointDiffer);
db.close();
IOUtils.closeQuietly(db);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -73,14 +74,15 @@
* When there is a {@link RocksDBException} with error,
* this class will close the underlying {@link org.rocksdb.RocksObject}s.
*/
public final class RocksDatabase {
public final class RocksDatabase implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(RocksDatabase.class);

public static final String ESTIMATE_NUM_KEYS = "rocksdb.estimate-num-keys";

private static Map<String, List<ColumnFamilyHandle>> dbNameToCfHandleMap =
new HashMap<>();

private final StackTraceElement[] stackTrace;

static IOException toIOException(Object name, String op, RocksDBException e) {
return HddsServerUtil.toIOException(name + ": Failed to " + op, e);
Expand Down Expand Up @@ -352,8 +354,10 @@ private RocksDatabase(File dbFile, ManagedRocksDB db,
this.descriptors = descriptors;
this.columnFamilies = columnFamilies;
this.counter = counter;
this.stackTrace = Thread.currentThread().getStackTrace();
}

@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
// Wait for all background work to be cancelled first. e.g. RDB compaction
Expand Down Expand Up @@ -900,5 +904,17 @@ public static Map<String, List<ColumnFamilyHandle>> getCfHandleMap() {
return dbNameToCfHandleMap;
}


@Override
protected void finalize() throws Throwable {
if (!isClosed()) {
String warning = "RocksDatabase is not closed properly.";
if (LOG.isDebugEnabled()) {
String debugMessage = String.format("%n StackTrace for unclosed " +
"RocksDatabase instance: %s", Arrays.toString(stackTrace));
warning = warning.concat(debugMessage);
}
LOG.warn(warning);
}
super.finalize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,7 @@ public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
rocksOptions.setListeners(list);
}

public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
throws RocksDBException {
public void setRocksDBForCompactionTracking(DBOptions rocksOptions) {
setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,22 +372,28 @@ private OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name)
// metadata constructor for snapshots
OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName,
boolean isSnapshotInCache) throws IOException {
lock = new OmReadOnlyLock();
omEpoch = 0;
String snapshotDir = OMStorage.getOmDbDir(conf) +
OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR;
File metaDir = new File(snapshotDir);
String dbName = OM_DB_NAME + snapshotDirName;
// The check is only to prevent every snapshot read to perform a disk IO
// and check if a checkpoint dir exists. If entry is present in cache,
// it is most likely DB entries will get flushed in this wait time.
if (isSnapshotInCache) {
File checkpoint = Paths.get(metaDir.toPath().toString(), dbName).toFile();
RDBCheckpointUtils.waitForCheckpointDirectoryExist(checkpoint);
try {
lock = new OmReadOnlyLock();
omEpoch = 0;
String snapshotDir = OMStorage.getOmDbDir(conf) +
OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR;
File metaDir = new File(snapshotDir);
String dbName = OM_DB_NAME + snapshotDirName;
// The check is only to prevent every snapshot read to perform a disk IO
// and check if a checkpoint dir exists. If entry is present in cache,
// it is most likely DB entries will get flushed in this wait time.
if (isSnapshotInCache) {
File checkpoint =
Paths.get(metaDir.toPath().toString(), dbName).toFile();
RDBCheckpointUtils.waitForCheckpointDirectoryExist(checkpoint);
}
setStore(loadDB(conf, metaDir, dbName, false,
java.util.Optional.of(Boolean.TRUE), false));
initializeOmTables(false);
} catch (IOException e) {
stop();
throw e;
}
setStore(loadDB(conf, metaDir, dbName, false,
java.util.Optional.of(Boolean.TRUE), false));
initializeOmTables(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -290,32 +290,32 @@ public OmSnapshotManager(OzoneManager ozoneManager) {

private CacheLoader<String, OmSnapshot> createCacheLoader() {
return new CacheLoader<String, OmSnapshot>() {
@Override

// load the snapshot into the cache if not already there
@Nonnull
@Override
public OmSnapshot load(@Nonnull String snapshotTableKey)
throws IOException {
// see if the snapshot exists
SnapshotInfo snapshotInfo = getSnapshotInfo(snapshotTableKey);

// Block snapshot from loading when it is no longer active e.g. DELETED,
// unless this is called from SnapshotDeletingService.
// Block snapshot from loading when it is no longer active
// e.g. DELETED, unless this is called from SnapshotDeletingService.
checkSnapshotActive(snapshotInfo);

CacheValue<SnapshotInfo> cacheValue =
ozoneManager.getMetadataManager().getSnapshotInfoTable()
.getCacheValue(new CacheKey<>(snapshotTableKey));
boolean isSnapshotInCache = cacheValue != null && Optional.ofNullable(
cacheValue.getCacheValue()).isPresent();
CacheValue<SnapshotInfo> cacheValue = ozoneManager.getMetadataManager()
.getSnapshotInfoTable()
.getCacheValue(new CacheKey<>(snapshotTableKey));

boolean isSnapshotInCache = Objects.nonNull(cacheValue) &&
Objects.nonNull(cacheValue.getCacheValue());

// read in the snapshot
OzoneConfiguration conf = ozoneManager.getConfiguration();
OMMetadataManager snapshotMetadataManager;

// Create the snapshot metadata manager by finding the corresponding
// RocksDB instance, creating an OmMetadataManagerImpl instance based on
// that
// RocksDB instance, creating an OmMetadataManagerImpl instance based
// on that.
OMMetadataManager snapshotMetadataManager;
try {
snapshotMetadataManager = new OmMetadataManagerImpl(conf,
snapshotInfo.getCheckpointDirName(), isSnapshotInCache);
Expand All @@ -324,18 +324,27 @@ public OmSnapshot load(@Nonnull String snapshotTableKey)
throw e;
}

// create the other manager instances based on snapshot metadataManager
PrefixManagerImpl pm = new PrefixManagerImpl(snapshotMetadataManager,
false);
KeyManagerImpl km = new KeyManagerImpl(null,
ozoneManager.getScmClient(), snapshotMetadataManager, conf,
ozoneManager.getBlockTokenSecretManager(),
ozoneManager.getKmsProvider(), ozoneManager.getPerfMetrics());

return new OmSnapshot(km, pm, ozoneManager,
snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(),
snapshotInfo.getName());
try {
// create the other manager instances based on snapshot
// metadataManager
PrefixManagerImpl pm = new PrefixManagerImpl(snapshotMetadataManager,
false);
KeyManagerImpl km = new KeyManagerImpl(null,
ozoneManager.getScmClient(), snapshotMetadataManager, conf,
ozoneManager.getBlockTokenSecretManager(),
ozoneManager.getKmsProvider(), ozoneManager.getPerfMetrics());

return new OmSnapshot(km, pm, ozoneManager,
snapshotInfo.getVolumeName(),
snapshotInfo.getBucketName(),
snapshotInfo.getName());
} catch (Exception e) {
// Close RocksDB if there is any failure.
if (!snapshotMetadataManager.getStore().isClosed()) {
snapshotMetadataManager.getStore().close();
}
throw new IOException(e);
}
}
};
}
Expand Down