Skip to content
67 changes: 63 additions & 4 deletions crates/fuel-core/src/service/adapters/compression_adapters.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,69 @@
use crate::{
database::{
database_description::compression::CompressionDatabase,
database_description::{
compression::CompressionDatabase,
on_chain::OnChain,
},
Database,
},
service::adapters::BlockImporterAdapter,
};
use fuel_core_compression_service::{
config,
ports::{
block_source,
block_source::{
self,
BlockAt,
},
canonical_height,
compression_storage,
configuration,
},
};
use fuel_core_storage::transactional::HistoricalView;
use fuel_core_types::services::block_importer::SharedImportResult;

impl block_source::BlockSource for BlockImporterAdapter {
use super::import_result_provider::{
self,
ImportResultProvider,
};

/// Provides the necessary functionality for accessing latest and historical block data.
pub struct CompressionBlockImporterAdapter {
block_importer: BlockImporterAdapter,
import_result_provider_adapter: ImportResultProvider,
}

impl CompressionBlockImporterAdapter {
pub fn new(
block_importer: BlockImporterAdapter,
import_result_provider_adapter: ImportResultProvider,
) -> Self {
Self {
block_importer,
import_result_provider_adapter,
}
}
}

impl From<BlockAt> for import_result_provider::BlockAt {
fn from(value: BlockAt) -> Self {
match value {
BlockAt::Genesis => Self::Genesis,
BlockAt::Specific(h) => Self::Specific(h.into()),
}
}
}

impl block_source::BlockSource for CompressionBlockImporterAdapter {
fn subscribe(&self) -> fuel_core_services::stream::BoxStream<SharedImportResult> {
self.events_shared_result()
self.block_importer.events_shared_result()
}

fn get_block(&self, height: BlockAt) -> Option<SharedImportResult> {
self.import_result_provider_adapter
.result_at_height(height.into())
.ok()
}
}

Expand All @@ -28,6 +75,18 @@ impl configuration::CompressionConfigProvider
}
}

impl compression_storage::LatestHeight for Database<CompressionDatabase> {
fn latest_height(&self) -> Option<u32> {
HistoricalView::latest_height(self).map(Into::into)
}
}

impl canonical_height::CanonicalHeight for Database<OnChain> {
fn get(&self) -> Option<u32> {
HistoricalView::latest_height(self).map(Into::into)
}
}

pub struct CompressionServiceAdapter {
db: Database<CompressionDatabase>,
}
Expand Down
13 changes: 12 additions & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{
compression_adapters::CompressionServiceAdapter,
import_result_provider,
BlockImporterAdapter,
BlockProducerAdapter,
ChainStateInfoProvider,
Expand Down Expand Up @@ -248,6 +249,15 @@ impl GraphQLBlockImporter {
}
}

impl From<BlockAt> for import_result_provider::BlockAt {
fn from(value: BlockAt) -> Self {
match value {
BlockAt::Genesis => Self::Genesis,
BlockAt::Specific(h) => Self::Specific(h),
}
}
}

impl worker::BlockImporter for GraphQLBlockImporter {
fn block_events(&self) -> BoxStream<SharedImportResult> {
self.block_importer_adapter.events_shared_result()
Expand All @@ -257,7 +267,8 @@ impl worker::BlockImporter for GraphQLBlockImporter {
&self,
height: BlockAt,
) -> anyhow::Result<SharedImportResult> {
self.import_result_provider_adapter.result_at_height(height)
self.import_result_provider_adapter
.result_at_height(height.into())
}
}

Expand Down
23 changes: 17 additions & 6 deletions crates/fuel-core/src/service/adapters/import_result_provider.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use crate::{
database::Database,
fuel_core_graphql_api::ports::worker::BlockAt,
service::adapters::ExecutorAdapter,
};
use fuel_core_importer::ports::Validator;
use fuel_core_storage::{
not_found,
transactional::AtomicView,
};
use fuel_core_types::services::{
block_importer::{
ImportResult,
SharedImportResult,
use fuel_core_types::{
fuel_types::BlockHeight,
services::{
block_importer::{
ImportResult,
SharedImportResult,
},
executor::ValidationResult,
},
executor::ValidationResult,
};
use std::sync::Arc;

Expand All @@ -32,6 +34,15 @@ impl ImportResultProvider {
}
}

/// Represents either the Genesis Block or a block at a specific height
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub enum BlockAt {
/// Block at a specific height
Specific(BlockHeight),
/// Genesis block
Genesis,
}

impl ImportResultProvider {
pub fn result_at_height(
&self,
Expand Down
13 changes: 11 additions & 2 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use crate::service::adapters::consensus_module::poa::pre_confirmation_signature:

use super::{
adapters::{
compression_adapters::CompressionServiceAdapter,
compression_adapters::{
CompressionBlockImporterAdapter,
CompressionServiceAdapter,
},
FuelBlockSigner,
P2PAdapter,
TxStatusManagerAdapter,
Expand Down Expand Up @@ -398,13 +401,19 @@ pub fn init_sub_services(
let compression_service_adapter =
CompressionServiceAdapter::new(database.compression().clone());

let compression_importer_adapter = CompressionBlockImporterAdapter::new(
importer_adapter.clone(),
import_result_provider.clone(),
);

let compression_service = match &config.da_compression {
DaCompressionMode::Disabled => None,
DaCompressionMode::Enabled(cfg) => Some(
new_compression_service(
importer_adapter.clone(),
compression_importer_adapter,
database.compression().clone(),
cfg.clone(),
database.on_chain().clone(),
)
.map_err(|e| anyhow::anyhow!(e))?,
),
Expand Down
3 changes: 3 additions & 0 deletions crates/services/compression/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub mod compression_storage;

/// Configuration port
pub mod configuration;

/// Canonical height port
pub mod canonical_height;
37 changes: 36 additions & 1 deletion crates/services/compression/src/ports/block_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub(crate) trait BlockWithMetadataExt {
fn block(&self) -> &fuel_core_types::blockchain::block::Block;
#[cfg(test)]
fn default() -> Self;
#[cfg(test)]
fn test_block_with_height(height: BlockHeight) -> Self;
}

impl BlockWithMetadataExt for BlockWithMetadata {
Expand All @@ -30,13 +32,46 @@ impl BlockWithMetadataExt for BlockWithMetadata {

std::sync::Arc::new(ImportResult::default().wrap())
}

#[cfg(test)]
fn test_block_with_height(height: BlockHeight) -> Self {
use fuel_core_types::services::block_importer::ImportResult;

let mut import_result = ImportResult::default();
import_result
.sealed_block
.entity
.header_mut()
.set_block_height(height.into());
std::sync::Arc::new(import_result.wrap())
}
}

/// Type alias for returned value by .subscribe() method on `BlockSource`
pub type BlockStream = BoxStream<BlockWithMetadata>;

/// Represents either the Genesis Block or a block at a specific height
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub enum BlockAt {
/// Block at a specific height
Specific(BlockHeight),
/// Genesis block
Genesis,
}

impl PartialEq<BlockHeight> for BlockAt {
fn eq(&self, other: &BlockHeight) -> bool {
match self {
Self::Genesis => 0 == *other,
Self::Specific(h) => h == other,
}
}
}

/// Port for L2 blocks source
pub trait BlockSource {
pub trait BlockSource: Send + Sync {
/// Should provide a stream of blocks with metadata
fn subscribe(&self) -> BlockStream;
/// Should provide the block at a given height
fn get_block(&self, height: BlockAt) -> Option<BlockWithMetadata>;
}
8 changes: 8 additions & 0 deletions crates/services/compression/src/ports/canonical_height.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use super::block_source::BlockHeight;

/// Canonical height port
/// Should provide the canonical height of the blockchain.
pub trait CanonicalHeight: Send + Sync {
/// Returns the canonical height of the blockchain.
fn get(&self) -> Option<BlockHeight>;
}
10 changes: 10 additions & 0 deletions crates/services/compression/src/ports/compression_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ use fuel_core_storage::{
/// Compressed block type alias
pub type CompressedBlock = fuel_core_compression::VersionedCompressedBlock;

/// Trait for getting the latest height of the compression storage
pub trait LatestHeight {
/// Get the latest height of the compression storage
fn latest_height(&self) -> Option<u32>;
}

/// Trait for interacting with storage that supports compression
pub trait CompressionStorage:
KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
+ Modifiable
+ Send
+ Sync
{
}

impl<T> CompressionStorage for T where
T: KeyValueInspect<Column = MerkleizedColumn<storage::column::CompressionColumn>>
+ Modifiable
+ Send
+ Sync
{
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/compression/src/ports/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::CompressionConfig;

/// Configuration for the compression service
pub trait CompressionConfigProvider {
pub trait CompressionConfigProvider: Send + Sync {
/// getter for the compression config
fn config(&self) -> CompressionConfig;
}
Loading
Loading