Skip to content
67 changes: 63 additions & 4 deletions crates/fuel-core/src/service/adapters/compression_adapters.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,69 @@
use crate::{
database::{
database_description::compression::CompressionDatabase,
database_description::{
compression::CompressionDatabase,
on_chain::OnChain,
},
Database,
},
service::adapters::BlockImporterAdapter,
};
use fuel_core_compression_service::{
config,
ports::{
block_source,
block_source::{
self,
BlockAt,
},
canonical_height,
compression_storage,
configuration,
},
};
use fuel_core_storage::transactional::HistoricalView;
use fuel_core_types::services::block_importer::SharedImportResult;

impl block_source::BlockSource for BlockImporterAdapter {
use super::import_result_provider::{
self,
ImportResultProvider,
};

/// CompressionBlockImporterAdapter is a wrapper around BlockImporterAdapter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't seem helpful. What is the extra functionality it brings as a wrapper? If it's "just" a wrapper, why does it also hold an import result provider?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 1f0fc4f

pub struct CompressionBlockImporterAdapter {
block_importer: BlockImporterAdapter,
import_result_provider_adapter: ImportResultProvider,
}

impl CompressionBlockImporterAdapter {
pub fn new(
block_importer: BlockImporterAdapter,
import_result_provider_adapter: ImportResultProvider,
) -> Self {
Self {
block_importer,
import_result_provider_adapter,
}
}
}

impl From<BlockAt> for import_result_provider::BlockAt {
fn from(value: BlockAt) -> Self {
match value {
BlockAt::Genesis => Self::Genesis,
BlockAt::Specific(h) => Self::Specific(h.into()),
}
}
}

impl block_source::BlockSource for CompressionBlockImporterAdapter {
fn subscribe(&self) -> fuel_core_services::stream::BoxStream<SharedImportResult> {
self.events_shared_result()
self.block_importer.events_shared_result()
}

fn get_block(&self, height: BlockAt) -> Option<SharedImportResult> {
self.import_result_provider_adapter
.result_at_height(height.into())
.ok()
}
}

Expand All @@ -28,6 +75,18 @@ impl configuration::CompressionConfigProvider
}
}

impl compression_storage::LatestHeight for Database<CompressionDatabase> {
fn latest_height(&self) -> Option<u32> {
HistoricalView::latest_height(self).map(Into::into)
}
}

impl canonical_height::CanonicalHeight for Database<OnChain> {
fn get(&self) -> Option<u32> {
HistoricalView::latest_height(self).map(Into::into)
}
}

pub struct CompressionServiceAdapter {
db: Database<CompressionDatabase>,
}
Expand Down
13 changes: 12 additions & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{
compression_adapters::CompressionServiceAdapter,
import_result_provider,
BlockImporterAdapter,
BlockProducerAdapter,
ChainStateInfoProvider,
Expand Down Expand Up @@ -248,6 +249,15 @@ impl GraphQLBlockImporter {
}
}

impl From<BlockAt> for import_result_provider::BlockAt {
fn from(value: BlockAt) -> Self {
match value {
BlockAt::Genesis => Self::Genesis,
BlockAt::Specific(h) => Self::Specific(h),
}
}
}

impl worker::BlockImporter for GraphQLBlockImporter {
fn block_events(&self) -> BoxStream<SharedImportResult> {
self.block_importer_adapter.events_shared_result()
Expand All @@ -257,7 +267,8 @@ impl worker::BlockImporter for GraphQLBlockImporter {
&self,
height: BlockAt,
) -> anyhow::Result<SharedImportResult> {
self.import_result_provider_adapter.result_at_height(height)
self.import_result_provider_adapter
.result_at_height(height.into())
}
}

Expand Down
23 changes: 17 additions & 6 deletions crates/fuel-core/src/service/adapters/import_result_provider.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use crate::{
database::Database,
fuel_core_graphql_api::ports::worker::BlockAt,
service::adapters::ExecutorAdapter,
};
use fuel_core_importer::ports::Validator;
use fuel_core_storage::{
not_found,
transactional::AtomicView,
};
use fuel_core_types::services::{
block_importer::{
ImportResult,
SharedImportResult,
use fuel_core_types::{
fuel_types::BlockHeight,
services::{
block_importer::{
ImportResult,
SharedImportResult,
},
executor::ValidationResult,
},
executor::ValidationResult,
};
use std::sync::Arc;

Expand All @@ -32,6 +34,15 @@ impl ImportResultProvider {
}
}

/// Represents either the Genesis Block or a block at a specific height
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub enum BlockAt {
/// Block at a specific height
Specific(BlockHeight),
/// Genesis block
Genesis,
}

impl ImportResultProvider {
pub fn result_at_height(
&self,
Expand Down
13 changes: 11 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use crate::service::adapters::consensus_module::poa::pre_confirmation_signature:

use super::{
adapters::{
compression_adapters::CompressionServiceAdapter,
compression_adapters::{
CompressionBlockImporterAdapter,
CompressionServiceAdapter,
},
FuelBlockSigner,
P2PAdapter,
TxStatusManagerAdapter,
Expand Down Expand Up @@ -398,13 +401,19 @@ pub fn init_sub_services(
let compression_service_adapter =
CompressionServiceAdapter::new(database.compression().clone());

let compression_importer_adapter = CompressionBlockImporterAdapter::new(
importer_adapter.clone(),
import_result_provider.clone(),
);

let compression_service = match &config.da_compression {
DaCompressionMode::Disabled => None,
DaCompressionMode::Enabled(cfg) => Some(
new_compression_service(
importer_adapter.clone(),
compression_importer_adapter,
database.compression().clone(),
cfg.clone(),
database.on_chain().clone(),
)
.map_err(|e| anyhow::anyhow!(e))?,
),
Expand Down
3 changes: 3 additions & 0 deletions crates/services/compression/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub mod compression_storage;

/// Configuration port
pub mod configuration;

/// Canonical height port
pub mod canonical_height;
37 changes: 36 additions & 1 deletion crates/services/compression/src/ports/block_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub(crate) trait BlockWithMetadataExt {
fn block(&self) -> &fuel_core_types::blockchain::block::Block;
#[cfg(test)]
fn default() -> Self;
#[cfg(test)]
fn test_block_with_height(height: BlockHeight) -> Self;
}

impl BlockWithMetadataExt for BlockWithMetadata {
Expand All @@ -30,13 +32,46 @@ impl BlockWithMetadataExt for BlockWithMetadata {

std::sync::Arc::new(ImportResult::default().wrap())
}

#[cfg(test)]
fn test_block_with_height(height: BlockHeight) -> Self {
use fuel_core_types::services::block_importer::ImportResult;

let mut import_result = ImportResult::default();
import_result
.sealed_block
.entity
.header_mut()
.set_block_height(height.into());
std::sync::Arc::new(import_result.wrap())
}
}

/// Type alias for returned value by .subscribe() method on `BlockSource`
pub type BlockStream = BoxStream<BlockWithMetadata>;

/// Represents either the Genesis Block or a block at a specific height
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub enum BlockAt {
/// Block at a specific height
Specific(BlockHeight),
/// Genesis block
Genesis,
}

impl PartialEq<BlockHeight> for BlockAt {
fn eq(&self, other: &BlockHeight) -> bool {
match self {
Self::Genesis => 0 == *other,
Self::Specific(h) => h == other,
}
}
}

/// Port for L2 blocks source
pub trait BlockSource {
pub trait BlockSource: Send + Sync {
/// Should provide a stream of blocks with metadata
fn subscribe(&self) -> BlockStream;
/// Should provide the block at a given height
fn get_block(&self, height: BlockAt) -> Option<BlockWithMetadata>;
}
8 changes: 8 additions & 0 deletions crates/services/compression/src/ports/canonical_height.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use super::block_source::BlockHeight;

/// Canonical height port
/// Should provide the canonical height of the blockchain.
pub trait CanonicalHeight: Send + Sync {
/// Returns the canonical height of the blockchain.
fn get(&self) -> Option<BlockHeight>;
}
10 changes: 10 additions & 0 deletions crates/services/compression/src/ports/compression_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ use fuel_core_storage::{
/// Compressed block type alias
pub type CompressedBlock = fuel_core_compression::VersionedCompressedBlock;

/// Trait for getting the latest height of the compression storage
pub trait LatestHeight {
/// Get the latest height of the compression storage
fn latest_height(&self) -> Option<u32>;
}

/// Trait for interacting with storage that supports compression
pub trait CompressionStorage:
KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
+ Modifiable
+ Send
+ Sync
{
}

impl<T> CompressionStorage for T where
T: KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
+ Modifiable
+ Send
+ Sync
{
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/compression/src/ports/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::CompressionConfig;

/// Configuration for the compression service
pub trait CompressionConfigProvider {
pub trait CompressionConfigProvider: Send + Sync {
/// getter for the compression config
fn config(&self) -> CompressionConfig;
}
Loading
Loading