Skip to content

Commit 6683ba2

Browse files
rymncxgreenx
andauthored
fix(compression): improve robustness on startup and shutdown (#2923)
## Linked Issues/PRs <!-- List of related issues/PRs --> - closes #2921 ## Description <!-- List of detailed changes --> The most important changes include the addition of a new adapter for block importing with compression, the implementation of traits for managing block heights, and the synchronization of previously produced blocks in the compression service. ### Enhancements to Block Importing and Compression: * [`crates/fuel-core/src/service/adapters/compression_adapters.rs`](diffhunk://#diff-9e66cb09255fa99a40c07b24f3002b040d023c09b9469521b2c8ad8736bf33bfL3-R66): Added `CompressionBlockImporterAdapter` to wrap around `BlockImporterAdapter` and handle block importing with compression. Implemented the `block_source::BlockSource` trait for this new adapter. * [`crates/fuel-core/src/service/adapters/import_result_provider.rs`](diffhunk://#diff-f348279c169cadf9d32da0b9f0b0e56b500551043908681479c91d52edb0a38cR37-R45): Introduced the `BlockAt` enum to represent either the Genesis Block or a block at a specific height. ### Implementation of Traits for Managing Block Heights: * [`crates/fuel-core/src/service/adapters/compression_adapters.rs`](diffhunk://#diff-9e66cb09255fa99a40c07b24f3002b040d023c09b9469521b2c8ad8736bf33bfR78-R89): Implemented the `compression_storage::LatestHeight` trait for `Database<CompressionDatabase>` and the `canonical_height::CanonicalHeight` trait for `Database<OnChain>`. * [`crates/services/compression/src/ports/compression_storage.rs`](diffhunk://#diff-766e163ef7ab6dd00944205be543374cd60cefb2fb5605467d2365ca71938cbbR15-R34): Added the `LatestHeight` trait for getting the latest height of the compression storage. * [`crates/services/compression/src/ports/canonical_height.rs`](diffhunk://#diff-4d817b8fba982a301ad44b5df62a6e7543cc6765df8cf92e8a6ec59d378b5ea1R1-R8): Introduced the `CanonicalHeight` trait to provide the canonical height of the blockchain. ### Synchronization of Previously Produced Blocks: * [`crates/services/compression/src/service.rs`](diffhunk://#diff-9fc61045febb6284fa8e01c5c33172787228fe21b8b813c62c8f03d2fec55fe8R143-R176): Enhanced the `CompressionService` to include methods for synchronizing previously produced blocks and handling new blocks during shutdown. [[1]](diffhunk://#diff-9fc61045febb6284fa8e01c5c33172787228fe21b8b813c62c8f03d2fec55fe8R143-R176) [[2]](diffhunk://#diff-9fc61045febb6284fa8e01c5c33172787228fe21b8b813c62c8f03d2fec55fe8L216-R305) ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: green <[email protected]>
1 parent 6a1f8e9 commit 6683ba2

File tree

10 files changed

+458
-81
lines changed

10 files changed

+458
-81
lines changed

crates/fuel-core/src/service/adapters/compression_adapters.rs

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,69 @@
11
use crate::{
22
database::{
3-
database_description::compression::CompressionDatabase,
3+
database_description::{
4+
compression::CompressionDatabase,
5+
on_chain::OnChain,
6+
},
47
Database,
58
},
69
service::adapters::BlockImporterAdapter,
710
};
811
use fuel_core_compression_service::{
912
config,
1013
ports::{
11-
block_source,
14+
block_source::{
15+
self,
16+
BlockAt,
17+
},
18+
canonical_height,
19+
compression_storage,
1220
configuration,
1321
},
1422
};
23+
use fuel_core_storage::transactional::HistoricalView;
1524
use fuel_core_types::services::block_importer::SharedImportResult;
1625

17-
impl block_source::BlockSource for BlockImporterAdapter {
26+
use super::import_result_provider::{
27+
self,
28+
ImportResultProvider,
29+
};
30+
31+
/// Provides the necessary functionality for accessing latest and historical block data.
32+
pub struct CompressionBlockImporterAdapter {
33+
block_importer: BlockImporterAdapter,
34+
import_result_provider_adapter: ImportResultProvider,
35+
}
36+
37+
impl CompressionBlockImporterAdapter {
38+
pub fn new(
39+
block_importer: BlockImporterAdapter,
40+
import_result_provider_adapter: ImportResultProvider,
41+
) -> Self {
42+
Self {
43+
block_importer,
44+
import_result_provider_adapter,
45+
}
46+
}
47+
}
48+
49+
impl From<BlockAt> for import_result_provider::BlockAt {
50+
fn from(value: BlockAt) -> Self {
51+
match value {
52+
BlockAt::Genesis => Self::Genesis,
53+
BlockAt::Specific(h) => Self::Specific(h.into()),
54+
}
55+
}
56+
}
57+
58+
impl block_source::BlockSource for CompressionBlockImporterAdapter {
1859
fn subscribe(&self) -> fuel_core_services::stream::BoxStream<SharedImportResult> {
19-
self.events_shared_result()
60+
self.block_importer.events_shared_result()
61+
}
62+
63+
fn get_block(&self, height: BlockAt) -> Option<SharedImportResult> {
64+
self.import_result_provider_adapter
65+
.result_at_height(height.into())
66+
.ok()
2067
}
2168
}
2269

@@ -28,6 +75,18 @@ impl configuration::CompressionConfigProvider
2875
}
2976
}
3077

78+
impl compression_storage::LatestHeight for Database<CompressionDatabase> {
79+
fn latest_height(&self) -> Option<u32> {
80+
HistoricalView::latest_height(self).map(Into::into)
81+
}
82+
}
83+
84+
impl canonical_height::CanonicalHeight for Database<OnChain> {
85+
fn get(&self) -> Option<u32> {
86+
HistoricalView::latest_height(self).map(Into::into)
87+
}
88+
}
89+
3190
pub struct CompressionServiceAdapter {
3291
db: Database<CompressionDatabase>,
3392
}

crates/fuel-core/src/service/adapters/graphql_api.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::{
22
compression_adapters::CompressionServiceAdapter,
3+
import_result_provider,
34
BlockImporterAdapter,
45
BlockProducerAdapter,
56
ChainStateInfoProvider,
@@ -248,6 +249,15 @@ impl GraphQLBlockImporter {
248249
}
249250
}
250251

252+
impl From<BlockAt> for import_result_provider::BlockAt {
253+
fn from(value: BlockAt) -> Self {
254+
match value {
255+
BlockAt::Genesis => Self::Genesis,
256+
BlockAt::Specific(h) => Self::Specific(h),
257+
}
258+
}
259+
}
260+
251261
impl worker::BlockImporter for GraphQLBlockImporter {
252262
fn block_events(&self) -> BoxStream<SharedImportResult> {
253263
self.block_importer_adapter.events_shared_result()
@@ -257,7 +267,8 @@ impl worker::BlockImporter for GraphQLBlockImporter {
257267
&self,
258268
height: BlockAt,
259269
) -> anyhow::Result<SharedImportResult> {
260-
self.import_result_provider_adapter.result_at_height(height)
270+
self.import_result_provider_adapter
271+
.result_at_height(height.into())
261272
}
262273
}
263274

crates/fuel-core/src/service/adapters/import_result_provider.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
use crate::{
22
database::Database,
3-
fuel_core_graphql_api::ports::worker::BlockAt,
43
service::adapters::ExecutorAdapter,
54
};
65
use fuel_core_importer::ports::Validator;
76
use fuel_core_storage::{
87
not_found,
98
transactional::AtomicView,
109
};
11-
use fuel_core_types::services::{
12-
block_importer::{
13-
ImportResult,
14-
SharedImportResult,
10+
use fuel_core_types::{
11+
fuel_types::BlockHeight,
12+
services::{
13+
block_importer::{
14+
ImportResult,
15+
SharedImportResult,
16+
},
17+
executor::ValidationResult,
1518
},
16-
executor::ValidationResult,
1719
};
1820
use std::sync::Arc;
1921

@@ -32,6 +34,15 @@ impl ImportResultProvider {
3234
}
3335
}
3436

37+
/// Represents either the Genesis Block or a block at a specific height
38+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
39+
pub enum BlockAt {
40+
/// Block at a specific height
41+
Specific(BlockHeight),
42+
/// Genesis block
43+
Genesis,
44+
}
45+
3546
impl ImportResultProvider {
3647
pub fn result_at_height(
3748
&self,

crates/fuel-core/src/service/sub_services.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ use crate::service::adapters::consensus_module::poa::pre_confirmation_signature:
3737

3838
use super::{
3939
adapters::{
40-
compression_adapters::CompressionServiceAdapter,
40+
compression_adapters::{
41+
CompressionBlockImporterAdapter,
42+
CompressionServiceAdapter,
43+
},
4144
FuelBlockSigner,
4245
P2PAdapter,
4346
TxStatusManagerAdapter,
@@ -398,13 +401,19 @@ pub fn init_sub_services(
398401
let compression_service_adapter =
399402
CompressionServiceAdapter::new(database.compression().clone());
400403

404+
let compression_importer_adapter = CompressionBlockImporterAdapter::new(
405+
importer_adapter.clone(),
406+
import_result_provider.clone(),
407+
);
408+
401409
let compression_service = match &config.da_compression {
402410
DaCompressionMode::Disabled => None,
403411
DaCompressionMode::Enabled(cfg) => Some(
404412
new_compression_service(
405-
importer_adapter.clone(),
413+
compression_importer_adapter,
406414
database.compression().clone(),
407415
cfg.clone(),
416+
database.on_chain().clone(),
408417
)
409418
.map_err(|e| anyhow::anyhow!(e))?,
410419
),

crates/services/compression/src/ports.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ pub mod compression_storage;
66

77
/// Configuration port
88
pub mod configuration;
9+
10+
/// Canonical height port
11+
pub mod canonical_height;

crates/services/compression/src/ports/block_source.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub(crate) trait BlockWithMetadataExt {
99
fn block(&self) -> &fuel_core_types::blockchain::block::Block;
1010
#[cfg(test)]
1111
fn default() -> Self;
12+
#[cfg(test)]
13+
fn test_block_with_height(height: BlockHeight) -> Self;
1214
}
1315

1416
impl BlockWithMetadataExt for BlockWithMetadata {
@@ -30,13 +32,46 @@ impl BlockWithMetadataExt for BlockWithMetadata {
3032

3133
std::sync::Arc::new(ImportResult::default().wrap())
3234
}
35+
36+
#[cfg(test)]
37+
fn test_block_with_height(height: BlockHeight) -> Self {
38+
use fuel_core_types::services::block_importer::ImportResult;
39+
40+
let mut import_result = ImportResult::default();
41+
import_result
42+
.sealed_block
43+
.entity
44+
.header_mut()
45+
.set_block_height(height.into());
46+
std::sync::Arc::new(import_result.wrap())
47+
}
3348
}
3449

3550
/// Type alias for returned value by .subscribe() method on `BlockSource`
3651
pub type BlockStream = BoxStream<BlockWithMetadata>;
3752

53+
/// Represents either the Genesis Block or a block at a specific height
54+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
55+
pub enum BlockAt {
56+
/// Block at a specific height
57+
Specific(BlockHeight),
58+
/// Genesis block
59+
Genesis,
60+
}
61+
62+
impl PartialEq<BlockHeight> for BlockAt {
63+
fn eq(&self, other: &BlockHeight) -> bool {
64+
match self {
65+
Self::Genesis => 0 == *other,
66+
Self::Specific(h) => h == other,
67+
}
68+
}
69+
}
70+
3871
/// Port for L2 blocks source
39-
pub trait BlockSource {
72+
pub trait BlockSource: Send + Sync {
4073
/// Should provide a stream of blocks with metadata
4174
fn subscribe(&self) -> BlockStream;
75+
/// Should provide the block at a given height
76+
fn get_block(&self, height: BlockAt) -> Option<BlockWithMetadata>;
4277
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use super::block_source::BlockHeight;
2+
3+
/// Canonical height port
4+
/// Should provide the canonical height of the blockchain.
5+
pub trait CanonicalHeight: Send + Sync {
6+
/// Returns the canonical height of the blockchain.
7+
fn get(&self) -> Option<BlockHeight>;
8+
}

crates/services/compression/src/ports/compression_storage.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,26 @@ use fuel_core_storage::{
1212
/// Compressed block type alias
1313
pub type CompressedBlock = fuel_core_compression::VersionedCompressedBlock;
1414

15+
/// Trait for getting the latest height of the compression storage
16+
pub trait LatestHeight {
17+
/// Get the latest height of the compression storage
18+
fn latest_height(&self) -> Option<u32>;
19+
}
20+
1521
/// Trait for interacting with storage that supports compression
1622
pub trait CompressionStorage:
1723
KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
1824
+ Modifiable
25+
+ Send
26+
+ Sync
1927
{
2028
}
2129

2230
impl<T> CompressionStorage for T where
2331
T: KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
2432
+ Modifiable
33+
+ Send
34+
+ Sync
2535
{
2636
}
2737

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::config::CompressionConfig;
22

33
/// Configuration for the compression service
4-
pub trait CompressionConfigProvider {
4+
pub trait CompressionConfigProvider: Send + Sync {
55
/// getter for the compression config
66
fn config(&self) -> CompressionConfig;
77
}

0 commit comments

Comments
 (0)