Skip to content

Commit 9af3ad2

Browse files
Dentosalrafal-chxgreenx
authored
Add decompression traits and a test case (#2295)
#1609 (comment) Add a test case for decompressing DA-compressed blocks. FIx `CombinedDb::from_config` to respects `state_rewind_policy` with tmp RocksDB. --------- Co-authored-by: Rafał Chabowski <[email protected]> Co-authored-by: Green Baneling <[email protected]>
1 parent b6cbb35 commit 9af3ad2

File tree

12 files changed

+290
-37
lines changed

12 files changed

+290
-37
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
3535
- [2389](https://github.com/FuelLabs/fuel-core/pull/2389): Fix construction of reverse iterator in RocksDB.
3636

3737
### Changed
38+
- [2295](https://github.com/FuelLabs/fuel-core/pull/2295): `CombinedDb::from_config` now respects `state_rewind_policy` with tmp RocksDB.
3839
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
3940
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
4041
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.

benches/benches/block_target_gas.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use fuel_core::{
2727
Config,
2828
FuelService,
2929
},
30+
state::historical_rocksdb::StateRewindPolicy,
3031
};
3132
use fuel_core_benches::{
3233
default_gas_costs::default_gas_costs,
@@ -265,7 +266,8 @@ fn service_with_many_contracts(
265266
.build()
266267
.unwrap();
267268
let _drop = rt.enter();
268-
let mut database = Database::rocksdb_temp();
269+
let mut database = Database::rocksdb_temp(StateRewindPolicy::NoRewind)
270+
.expect("Failed to create database");
269271

270272
let mut chain_config = ChainConfig::local_testnet();
271273

crates/compression/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub use registry::RegistryKeyspace;
1616
use fuel_core_types::{
1717
blockchain::header::PartialBlockHeader,
1818
fuel_tx::CompressedTransaction,
19+
fuel_types::BlockHeight,
1920
};
2021
use registry::RegistrationsPerTable;
2122

@@ -42,6 +43,15 @@ impl Default for VersionedCompressedBlock {
4243
}
4344
}
4445

46+
impl VersionedCompressedBlock {
47+
/// Returns the height of the compressed block.
48+
pub fn height(&self) -> &BlockHeight {
49+
match self {
50+
VersionedCompressedBlock::V0(block) => block.header.height(),
51+
}
52+
}
53+
}
54+
4555
#[cfg(test)]
4656
mod tests {
4757
use fuel_core_compression as _;

crates/compression/src/registry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ macro_rules! tables {
7878

7979

8080
impl RegistrationsPerTable {
81-
pub(crate) fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
81+
pub fn write_to_registry<R>(&self, registry: &mut R, timestamp: Tai64) -> anyhow::Result<()>
8282
where
8383
R: TemporalRegistryAll
8484
{

crates/fuel-core/src/combined_database.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ impl CombinedDatabase {
105105
})
106106
}
107107

108+
/// A test-only temporary rocksdb database with given rewind policy.
109+
#[cfg(feature = "rocksdb")]
110+
pub fn temp_database_with_state_rewind_policy(
111+
state_rewind_policy: StateRewindPolicy,
112+
) -> DatabaseResult<Self> {
113+
Ok(Self {
114+
on_chain: Database::rocksdb_temp(state_rewind_policy)?,
115+
off_chain: Database::rocksdb_temp(state_rewind_policy)?,
116+
relayer: Default::default(),
117+
gas_price: Default::default(),
118+
})
119+
}
120+
108121
pub fn from_config(config: &CombinedDatabaseConfig) -> DatabaseResult<Self> {
109122
let combined_database = match config.database_type {
110123
#[cfg(feature = "rocksdb")]
@@ -114,7 +127,9 @@ impl CombinedDatabase {
114127
tracing::warn!(
115128
"No RocksDB path configured, initializing database with a tmp directory"
116129
);
117-
CombinedDatabase::default()
130+
CombinedDatabase::temp_database_with_state_rewind_policy(
131+
config.state_rewind_policy,
132+
)?
118133
} else {
119134
tracing::info!(
120135
"Opening database {:?} with cache size \"{}\" and state rewind policy \"{:?}\"",

crates/fuel-core/src/database.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -251,12 +251,11 @@ where
251251
}
252252

253253
#[cfg(feature = "rocksdb")]
254-
pub fn rocksdb_temp() -> Self {
255-
let db = RocksDb::<Historical<Description>>::default_open_temp(None).unwrap();
256-
let historical_db =
257-
HistoricalRocksDB::new(db, StateRewindPolicy::NoRewind).unwrap();
254+
pub fn rocksdb_temp(rewind_policy: StateRewindPolicy) -> Result<Self> {
255+
let db = RocksDb::<Historical<Description>>::default_open_temp(None)?;
256+
let historical_db = HistoricalRocksDB::new(db, rewind_policy)?;
258257
let data = Arc::new(historical_db);
259-
Self::from_storage(DataSource::new(data, Stage::default()))
258+
Ok(Self::from_storage(DataSource::new(data, Stage::default())))
260259
}
261260
}
262261

@@ -275,7 +274,8 @@ where
275274
}
276275
#[cfg(feature = "rocksdb")]
277276
{
278-
Self::rocksdb_temp()
277+
Self::rocksdb_temp(StateRewindPolicy::NoRewind)
278+
.expect("Failed to create a temporary database")
279279
}
280280
}
281281
}
@@ -408,7 +408,7 @@ impl Modifiable for GenesisDatabase<Relayer> {
408408
}
409409
}
410410

411-
fn commit_changes_with_height_update<Description>(
411+
pub fn commit_changes_with_height_update<Description>(
412412
database: &mut Database<Description>,
413413
changes: Changes,
414414
heights_lookup: impl Fn(

crates/fuel-core/src/graphql_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
};
1010

1111
pub mod api_service;
12-
mod da_compression;
12+
pub mod da_compression;
1313
pub mod database;
1414
pub(crate) mod metrics_extension;
1515
pub mod ports;

crates/fuel-core/src/graphql_api/da_compression.rs

Lines changed: 172 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,21 @@ use fuel_core_compression::{
1414
config::Config,
1515
ports::{
1616
EvictorDb,
17+
HistoryLookup,
1718
TemporalRegistry,
1819
UtxoIdToPointer,
1920
},
2021
};
2122
use fuel_core_storage::{
2223
not_found,
24+
tables::{
25+
Coins,
26+
FuelBlocks,
27+
Messages,
28+
},
2329
StorageAsMut,
2430
StorageAsRef,
31+
StorageInspect,
2532
};
2633
use fuel_core_types::{
2734
blockchain::block::Block,
@@ -49,8 +56,8 @@ where
4956
{
5057
let compressed = compress(
5158
config,
52-
CompressTx {
53-
db_tx,
59+
CompressDbTx {
60+
db_tx: DbTx { db_tx },
5461
block_events,
5562
},
5663
block,
@@ -65,14 +72,23 @@ where
6572
Ok(())
6673
}
6774

68-
struct CompressTx<'a, Tx> {
69-
db_tx: &'a mut Tx,
75+
pub struct DbTx<'a, Tx> {
76+
pub db_tx: &'a mut Tx,
77+
}
78+
79+
struct CompressDbTx<'a, Tx> {
80+
db_tx: DbTx<'a, Tx>,
7081
block_events: &'a [Event],
7182
}
7283

84+
pub struct DecompressDbTx<'a, Tx, Onchain> {
85+
pub db_tx: DbTx<'a, Tx>,
86+
pub onchain_db: Onchain,
87+
}
88+
7389
macro_rules! impl_temporal_registry {
7490
($type:ty) => { paste::paste! {
75-
impl<'a, Tx> TemporalRegistry<$type> for CompressTx<'a, Tx>
91+
impl<'a, Tx> TemporalRegistry<$type> for DbTx<'a, Tx>
7692
where
7793
Tx: OffChainDatabaseTransaction,
7894
{
@@ -150,15 +166,87 @@ macro_rules! impl_temporal_registry {
150166
}
151167
}
152168

153-
impl<'a, Tx> EvictorDb<$type> for CompressTx<'a, Tx>
169+
impl<'a, Tx> TemporalRegistry<$type> for CompressDbTx<'a, Tx>
170+
where
171+
Tx: OffChainDatabaseTransaction,
172+
{
173+
fn read_registry(
174+
&self,
175+
key: &fuel_core_types::fuel_compression::RegistryKey,
176+
) -> anyhow::Result<$type> {
177+
self.db_tx.read_registry(key)
178+
}
179+
180+
fn read_timestamp(
181+
&self,
182+
key: &fuel_core_types::fuel_compression::RegistryKey,
183+
) -> anyhow::Result<Tai64> {
184+
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
185+
}
186+
187+
fn write_registry(
188+
&mut self,
189+
key: &fuel_core_types::fuel_compression::RegistryKey,
190+
value: &$type,
191+
timestamp: Tai64,
192+
) -> anyhow::Result<()> {
193+
self.db_tx.write_registry(key, value, timestamp)
194+
}
195+
196+
fn registry_index_lookup(
197+
&self,
198+
value: &$type,
199+
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
200+
{
201+
self.db_tx.registry_index_lookup(value)
202+
}
203+
}
204+
205+
impl<'a, Tx, Offchain> TemporalRegistry<$type> for DecompressDbTx<'a, Tx, Offchain>
206+
where
207+
Tx: OffChainDatabaseTransaction,
208+
{
209+
fn read_registry(
210+
&self,
211+
key: &fuel_core_types::fuel_compression::RegistryKey,
212+
) -> anyhow::Result<$type> {
213+
self.db_tx.read_registry(key)
214+
}
215+
216+
fn read_timestamp(
217+
&self,
218+
key: &fuel_core_types::fuel_compression::RegistryKey,
219+
) -> anyhow::Result<Tai64> {
220+
<_ as TemporalRegistry<$type>>::read_timestamp(&self.db_tx, key)
221+
}
222+
223+
fn write_registry(
224+
&mut self,
225+
key: &fuel_core_types::fuel_compression::RegistryKey,
226+
value: &$type,
227+
timestamp: Tai64,
228+
) -> anyhow::Result<()> {
229+
self.db_tx.write_registry(key, value, timestamp)
230+
}
231+
232+
fn registry_index_lookup(
233+
&self,
234+
value: &$type,
235+
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>>
236+
{
237+
self.db_tx.registry_index_lookup(value)
238+
}
239+
}
240+
241+
impl<'a, Tx> EvictorDb<$type> for CompressDbTx<'a, Tx>
154242
where
155243
Tx: OffChainDatabaseTransaction,
156244
{
157245
fn set_latest_assigned_key(
158246
&mut self,
159247
key: fuel_core_types::fuel_compression::RegistryKey,
160248
) -> anyhow::Result<()> {
161-
self.db_tx
249+
self.db_tx.db_tx
162250
.storage_as_mut::<DaCompressionTemporalRegistryEvictorCache>()
163251
.insert(&MetadataKey::$type, &key)?;
164252
Ok(())
@@ -168,7 +256,7 @@ macro_rules! impl_temporal_registry {
168256
&self,
169257
) -> anyhow::Result<Option<fuel_core_types::fuel_compression::RegistryKey>> {
170258
Ok(self
171-
.db_tx
259+
.db_tx.db_tx
172260
.storage_as_ref::<DaCompressionTemporalRegistryEvictorCache>()
173261
.get(&MetadataKey::$type)?
174262
.map(|v| v.into_owned())
@@ -185,7 +273,7 @@ impl_temporal_registry!(ContractId);
185273
impl_temporal_registry!(ScriptCode);
186274
impl_temporal_registry!(PredicateCode);
187275

188-
impl<'a, Tx> UtxoIdToPointer for CompressTx<'a, Tx>
276+
impl<'a, Tx> UtxoIdToPointer for CompressDbTx<'a, Tx>
189277
where
190278
Tx: OffChainDatabaseTransaction,
191279
{
@@ -210,3 +298,78 @@ where
210298
anyhow::bail!("UtxoId not found in the block events");
211299
}
212300
}
301+
302+
impl<'a, Tx, Onchain> HistoryLookup for DecompressDbTx<'a, Tx, Onchain>
303+
where
304+
Tx: OffChainDatabaseTransaction,
305+
Onchain: StorageInspect<Coins, Error = fuel_core_storage::Error>
306+
+ StorageInspect<Messages, Error = fuel_core_storage::Error>
307+
+ StorageInspect<FuelBlocks, Error = fuel_core_storage::Error>,
308+
{
309+
fn utxo_id(
310+
&self,
311+
c: fuel_core_types::fuel_tx::CompressedUtxoId,
312+
) -> anyhow::Result<fuel_core_types::fuel_tx::UtxoId> {
313+
if c.tx_pointer.block_height() == 0u32.into() {
314+
// This is a genesis coin, which is handled differently.
315+
// See CoinConfigGenerator::generate which generates the genesis coins.
316+
let mut bytes = [0u8; 32];
317+
let tx_index = c.tx_pointer.tx_index();
318+
bytes[..std::mem::size_of_val(&tx_index)]
319+
.copy_from_slice(&tx_index.to_be_bytes());
320+
return Ok(fuel_core_types::fuel_tx::UtxoId::new(
321+
fuel_core_types::fuel_tx::TxId::from(bytes),
322+
0,
323+
));
324+
}
325+
326+
let block_info = self
327+
.onchain_db
328+
.storage_as_ref::<FuelBlocks>()
329+
.get(&c.tx_pointer.block_height())?
330+
.ok_or(not_found!(FuelBlocks))?;
331+
332+
let tx_id = *block_info
333+
.transactions()
334+
.get(c.tx_pointer.tx_index() as usize)
335+
.ok_or(anyhow::anyhow!(
336+
"Transaction not found in the block: {:?}",
337+
c.tx_pointer
338+
))?;
339+
340+
Ok(fuel_core_types::fuel_tx::UtxoId::new(tx_id, c.output_index))
341+
}
342+
343+
fn coin(
344+
&self,
345+
utxo_id: fuel_core_types::fuel_tx::UtxoId,
346+
) -> anyhow::Result<fuel_core_compression::ports::CoinInfo> {
347+
let coin = self
348+
.onchain_db
349+
.storage_as_ref::<fuel_core_storage::tables::Coins>()
350+
.get(&utxo_id)?
351+
.ok_or(not_found!(fuel_core_storage::tables::Coins))?;
352+
Ok(fuel_core_compression::ports::CoinInfo {
353+
owner: *coin.owner(),
354+
asset_id: *coin.asset_id(),
355+
amount: *coin.amount(),
356+
})
357+
}
358+
359+
fn message(
360+
&self,
361+
nonce: fuel_core_types::fuel_types::Nonce,
362+
) -> anyhow::Result<fuel_core_compression::ports::MessageInfo> {
363+
let message = self
364+
.onchain_db
365+
.storage_as_ref::<fuel_core_storage::tables::Messages>()
366+
.get(&nonce)?
367+
.ok_or(not_found!(fuel_core_storage::tables::Messages))?;
368+
Ok(fuel_core_compression::ports::MessageInfo {
369+
sender: *message.sender(),
370+
recipient: *message.recipient(),
371+
amount: message.amount(),
372+
data: message.data().clone(),
373+
})
374+
}
375+
}

crates/fuel-core/src/state/generic_database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl<Storage> GenericDatabase<Storage> {
4444
}
4545

4646
pub fn into_inner(self) -> Storage {
47-
self.storage.into_inner()
47+
self.storage.into_storage()
4848
}
4949
}
5050

crates/storage/src/structured_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl<S> StructuredStorage<S> {
105105
}
106106

107107
/// Returns the inner storage.
108-
pub fn into_inner(self) -> S {
108+
pub fn into_storage(self) -> S {
109109
self.inner
110110
}
111111
}

0 commit comments

Comments
 (0)