Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ LMDBTreeStore::LMDBTreeStore(std::string directory, std::string name, uint64_t m
}
}

const std::string& LMDBTreeStore::get_name() const
{
return _name;
}

void LMDBTreeStore::get_stats(TreeDBStats& stats, ReadTransaction& tx)
{
stats.mapSize = _environment->get_map_size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class LMDBTreeStore : public LMDBStoreBase {
LMDBTreeStore& operator=(LMDBTreeStore&& other) = delete;
~LMDBTreeStore() override = default;

const std::string& get_name() const;

void get_stats(TreeDBStats& stats, ReadTransaction& tx);

void write_block_data(const block_number_t& blockNumber, const BlockPayload& blockData, WriteTransaction& tx);
Expand Down
14 changes: 14 additions & 0 deletions barretenberg/cpp/src/barretenberg/lmdblib/lmdb_store_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,18 @@ LMDBStoreBase::WriteTransaction::Ptr LMDBStoreBase::create_write_transaction() c
_environment->wait_for_writer();
return std::make_unique<WriteTransaction>(_environment);
}

void LMDBStoreBase::copy_store(const std::string& dstPath, bool compact)
{
// Create a write tx to acquire a write lock to prevent writes while copying. From LMDB docs:
// "[mdb_copy] can trigger significant file size growth if run in parallel with write transactions,
// because pages which they free during copying cannot be reused until the copy is done."
WriteTransaction::Ptr tx = create_write_transaction();
call_lmdb_func("mdb_env_copy2",
mdb_env_copy2,
_environment->underlying(),
dstPath.c_str(),
static_cast<unsigned int>(compact ? MDB_CP_COMPACT : 0));
}

} // namespace bb::lmdblib
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class LMDBStoreBase {
ReadTransaction::SharedPtr create_shared_read_transaction() const;
WriteTransaction::Ptr create_write_transaction() const;
LMDBDatabaseCreationTransaction::Ptr create_db_transaction() const;
void copy_store(const std::string& dstPath, bool compact);

protected:
std::string _dbDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum LMDBStoreMessageType {
STATS,

CLOSE,
COPY_STORE,
};

struct OpenDatabaseRequest {
Expand Down Expand Up @@ -118,6 +119,12 @@ struct StatsResponse {
MSGPACK_FIELDS(stats, dbMapSizeBytes);
};

struct CopyStoreRequest {
std::string dstPath;
std::optional<bool> compact;
MSGPACK_FIELDS(dstPath, compact);
};

} // namespace bb::nodejs::lmdb_store

MSGPACK_ADD_ENUM(bb::nodejs::lmdb_store::LMDBStoreMessageType)
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)

// The close operation requires exclusive execution, no other operations can be run concurrently with it
_msg_processor.register_handler(LMDBStoreMessageType::CLOSE, this, &LMDBStoreWrapper::close, true);

_msg_processor.register_handler(LMDBStoreMessageType::COPY_STORE, this, &LMDBStoreWrapper::copy_store);
}

Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
Expand Down Expand Up @@ -279,6 +281,14 @@ BoolResponse LMDBStoreWrapper::close()
return { true };
}

BoolResponse LMDBStoreWrapper::copy_store(const CopyStoreRequest& req)
{
verify_store();
_store->copy_store(req.dstPath, req.compact.value_or(false));

return { true };
}

std::pair<bool, bb::lmdblib::KeyDupValuesVector> LMDBStoreWrapper::_advance_cursor(const lmdblib::LMDBCursor& cursor,
bool reverse,
uint64_t page_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class LMDBStoreWrapper : public Napi::ObjectWrap<LMDBStoreWrapper> {

BoolResponse close();

BoolResponse copy_store(const CopyStoreRequest& req);

static std::pair<bool, lmdblib::KeyDupValuesVector> _advance_cursor(const lmdblib::LMDBCursor& cursor,
bool reverse,
uint64_t page_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ WorldStateWrapper::WorldStateWrapper(const Napi::CallbackInfo& info)
_dispatcher.register_target(
WorldStateMessageType::REVERT_CHECKPOINT,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return revert_checkpoint(obj, buffer); });

_dispatcher.register_target(
WorldStateMessageType::COPY_STORES,
[this](msgpack::object& obj, msgpack::sbuffer& buffer) { return copy_stores(obj, buffer); });
}

Napi::Value WorldStateWrapper::call(const Napi::CallbackInfo& info)
Expand Down Expand Up @@ -824,6 +828,20 @@ bool WorldStateWrapper::get_status(msgpack::object& obj, msgpack::sbuffer& buf)
return true;
}

bool WorldStateWrapper::copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer)
{
TypedMessage<CopyStoresRequest> request;
obj.convert(request);

_ws->copy_stores(request.value.dstPath, request.value.compact.value_or(false));

MsgHeader header(request.header.messageId);
messaging::TypedMessage<WorldStateStatusFull> resp_msg(WorldStateMessageType::COPY_STORES, header, {});
msgpack::pack(buffer, resp_msg);

return true;
}

Napi::Function WorldStateWrapper::get_class(Napi::Env env)
{
return DefineClass(env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class WorldStateWrapper : public Napi::ObjectWrap<WorldStateWrapper> {
bool checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);
bool commit_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);
bool revert_checkpoint(msgpack::object& obj, msgpack::sbuffer& buffer);

bool copy_stores(msgpack::object& obj, msgpack::sbuffer& buffer);
};

} // namespace bb::nodejs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ enum WorldStateMessageType {
COMMIT_CHECKPOINT,
REVERT_CHECKPOINT,

COPY_STORES,

CLOSE = 999,
};

Expand Down Expand Up @@ -231,6 +233,12 @@ struct SyncBlockRequest {
publicDataWrites);
};

struct CopyStoresRequest {
std::string dstPath;
std::optional<bool> compact;
MSGPACK_FIELDS(dstPath, compact);
};

} // namespace bb::nodejs

MSGPACK_ADD_ENUM(bb::nodejs::WorldStateMessageType)
12 changes: 12 additions & 0 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ void WorldState::create_canonical_fork(const std::string& dataDir,
_forks[fork->_forkId] = fork;
}

void WorldState::copy_stores(const std::string& dstPath, bool compact) const
{
auto copyStore = [&](const LMDBTreeStore::SharedPtr& store) {
std::filesystem::path directory = dstPath;
directory /= store->get_name();
std::filesystem::create_directories(directory);
store->copy_store(directory, compact);
};

std::for_each(_persistentStores->begin(), _persistentStores->end(), copyStore);
}

Fork::SharedPtr WorldState::retrieve_fork(const uint64_t& forkId) const
{
std::unique_lock lock(mtx);
Expand Down
8 changes: 8 additions & 0 deletions barretenberg/cpp/src/barretenberg/world_state/world_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ class WorldState {
const std::vector<PublicDataLeafValue>& prefilled_public_data,
uint32_t initial_header_generator_point);

/**
* @brief Copies all underlying LMDB stores to the target directory while acquiring a write lock
*
* @param dstPath Parent folder where trees will be copied
* @param compact Whether to compact stores when copying
*/
void copy_stores(const std::string& dstPath, bool compact) const;

/**
* @brief Get tree metadata for a particular tree
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ struct WorldStateStores {
, messageStore(std::move(other.messageStore))
{}

auto begin() const { return stores.begin(); }
auto end() const { return stores.end(); }

WorldStateStores(const WorldStateStores& other) = delete;
~WorldStateStores() = default;

WorldStateStores& operator=(WorldStateStores&& other) = delete;
WorldStateStores& operator=(WorldStateStores& other) = delete;

private:
std::array<LMDBTreeStore::SharedPtr, 5> stores{
nullifierStore, publicDataStore, archiveStore, noteHashStore, messageStore
};
};
} // namespace bb::world_state
18 changes: 18 additions & 0 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,18 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
return { provenBlockNumber };
}

/** Restarts the archiver after a stop. */
public restart() {
if (!this.runningPromise) {
throw new Error(`Archiver was never started`);
}
if (this.runningPromise.isRunning()) {
this.log.warn(`Archiver already running`);
}
this.log.info(`Restarting archiver`);
this.runningPromise.start();
}

/**
* Stops the archiver.
* @returns A promise signalling completion of the stop process.
Expand All @@ -554,6 +566,10 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
return Promise.resolve();
}

public backupTo(destPath: string): Promise<string> {
return this.dataStore.backupTo(destPath);
}

public getL1Constants(): Promise<L1RollupConstants> {
return Promise.resolve(this.l1constants);
}
Expand Down Expand Up @@ -874,6 +890,8 @@ class ArchiverStoreHelper
| 'addContractInstanceUpdates'
| 'deleteContractInstanceUpdates'
| 'addFunctions'
| 'backupTo'
| 'close'
>
{
#log = createLogger('archiver:block-helper');
Expand Down
6 changes: 6 additions & 0 deletions yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,10 @@ export interface ArchiverDataStore {
* Estimates the size of the store in bytes.
*/
estimateSize(): Promise<{ mappingSize: number; actualSize: number; numItems: number }>;

/** Backups the archiver db to the target folder. Returns the path to the db file. */
backupTo(path: string): Promise<string>;

/** Closes the underlying data store. */
close(): Promise<void>;
}
2 changes: 1 addition & 1 deletion yarn-project/archiver/src/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ export * from './archiver.js';
export * from './config.js';
export { type PublishedL2Block, type L1PublishedData } from './structs/published.js';
export type { ArchiverDataStore } from './archiver_store.js';
export { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js';
export { KVArchiverDataStore, ARCHIVER_DB_VERSION } from './kv_archiver_store/kv_archiver_store.js';
export { ContractInstanceStore } from './kv_archiver_store/contract_instance_store.js';
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { type LogFilter, PrivateLog, type TxScopedL2Log } from '@aztec/stdlib/lo
import type { InboxLeaf } from '@aztec/stdlib/messaging';
import type { BlockHeader, TxHash, TxReceipt } from '@aztec/stdlib/tx';

import { join } from 'path';

import type { ArchiverDataStore, ArchiverL1SynchPoint } from '../archiver_store.js';
import type { DataRetrieval } from '../structs/data_retrieval.js';
import type { PublishedL2Block } from '../structs/published.js';
Expand All @@ -26,11 +28,13 @@ import { ContractInstanceStore } from './contract_instance_store.js';
import { LogStore } from './log_store.js';
import { MessageStore } from './message_store.js';

export const ARCHIVER_DB_VERSION = 1;

/**
* LMDB implementation of the ArchiverDataStore interface.
*/
export class KVArchiverDataStore implements ArchiverDataStore {
public static readonly SCHEMA_VERSION = 1;
public static readonly SCHEMA_VERSION = ARCHIVER_DB_VERSION;

#blockStore: BlockStore;
#logStore: LogStore;
Expand All @@ -49,6 +53,15 @@ export class KVArchiverDataStore implements ArchiverDataStore {
this.#contractInstanceStore = new ContractInstanceStore(db);
}

public async backupTo(path: string, compress = true): Promise<string> {
await this.db.backupTo(path, compress);
return join(path, 'data.mdb');
}

Comment on lines +56 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something for this PR but we should consider bundling the rollup address/schema version into the backup to avoid restoring incompatible versions?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to go back and update this comment: this is handled already. This info is part of the snapshot index

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding it to the snapshot metadata. We could write those as entries of the db itself though, and completely do away with the db_version file we use today. WDYT?

public close() {
return this.db.close();
}

// TODO: These function names are in memory only as they are for development/debugging. They require the full contract
// artifact supplied to the node out of band. This should be reviewed and potentially removed as part of
// the node api cleanup process.
Expand Down
11 changes: 4 additions & 7 deletions yarn-project/archiver/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien

import { Archiver } from './archiver/archiver.js';
import type { ArchiverConfig } from './archiver/config.js';
import { KVArchiverDataStore } from './archiver/index.js';
import { ARCHIVER_DB_VERSION, KVArchiverDataStore } from './archiver/kv_archiver_store/kv_archiver_store.js';
import { createArchiverClient } from './rpc/index.js';

export const ARCHIVER_STORE_NAME = 'archiver';

/**
* Creates a local archiver.
* @param config - The archiver configuration.
Expand All @@ -32,12 +34,7 @@ export async function createArchiver(
telemetry: TelemetryClient = getTelemetryClient(),
): Promise<ArchiverApi & Service & L2BlockSourceEventEmitter> {
const config = { ..._config, dataStoreMapSizeKB: _config.archiverStoreMapSizeKb ?? _config.dataStoreMapSizeKB };
const store = await createStore(
'archiver',
KVArchiverDataStore.SCHEMA_VERSION,
config,
createLogger('archiver:lmdb'),
);
const store = await createStore(ARCHIVER_STORE_NAME, ARCHIVER_DB_VERSION, config, createLogger('archiver:lmdb'));
const archiverStore = new KVArchiverDataStore(store, config.maxLogs);
await registerProtocolContracts(archiverStore);
return Archiver.createAndSync(config, archiverStore, { telemetry, blobSinkClient }, opts.blockUntilSync);
Expand Down
Loading
Loading