Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,752 changes: 813 additions & 939 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ paste = "1"
pin-project = "1.1.2"
prometheus = "0.13"
prometheus_exporter = "0.8"
prost = { version = "0.11" }
prost = { version = "0.12" }
quickcheck = "1"
quickcheck_async = "0.1"
quickcheck_macros = "1"
Expand Down Expand Up @@ -216,20 +216,28 @@ cid = { version = "0.10.1", default-features = false, features = [
frc42_dispatch = "6.0.0"

# Using the same tendermint-rs dependency as tower-abci. From both we are interested in v037 modules.
tower-abci = { version = "0.7" }
tower-abci = { version = "0.14" }
tower = { version = "0.4" }
tendermint = { version = "0.31", features = ["secp256k1"] }
tendermint-config = "0.33.0"
tendermint-rpc = { version = "0.31", features = [
tendermint = { version = "0.36", features = ["secp256k1"] }
tendermint-config = "0.36.0"
tendermint-rpc = { version = "0.36", features = [
"secp256k1",
"http-client",
"websocket-client",
] }
tendermint-proto = { version = "0.31" }
tendermint-proto = { version = "0.36" }

[patch.crates-io]
# Use stable-only features.
gcra = { git = "https://github.com/consensus-shipyard/gcra-rs.git", branch = "main" }
tendermint = { git = "https://github.com/ec2/tendermint-rs.git", branch = "ec2/ipc"}
tendermint-config = { git = "https://github.com/ec2/tendermint-rs.git", branch = "ec2/ipc"}
tendermint-rpc = { git = "https://github.com/ec2/tendermint-rs.git", branch = "ec2/ipc"}
tendermint-proto = { git = "https://github.com/ec2/tendermint-rs.git", branch = "ec2/ipc"}
# tendermint = { path = "./tendermint-rs/tendermint"}
# tendermint-config = { path = "./tendermint-rs/config"}
# tendermint-rpc = { path = "./tendermint-rs/rpc"}
# tendermint-proto = { path = "./tendermint-rs/proto"}

[profile.wasm]
inherits = "release"
Expand Down
50 changes: 30 additions & 20 deletions fendermint/abci/examples/kvstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use fendermint_abci::{
use structopt::StructOpt;
use tendermint::abci::{request, response, Event, EventAttributeIndexExt};
use tower::ServiceBuilder;
use tower_abci::{split, v037::Server};
use tower_abci::{v038::split, v038::Server};
use tracing::{info, Level};

// For the sake of example, sho the relationship between buffering, concurrency and block size.
Expand Down Expand Up @@ -94,33 +94,43 @@ impl Application for KVStore {
}
}

async fn deliver_tx(&self, request: request::DeliverTx) -> Result<response::DeliverTx> {
let tx = String::from_utf8(request.tx.to_vec()).unwrap();
let (key, value) = match tx.split('=').collect::<Vec<_>>() {
k if k.len() == 1 => (k[0], k[0]),
kv => (kv[0], kv[1]),
};

atomically(|| {
self.store.update(|mut store| {
store.insert(key.into(), value.into());
store
async fn finalize_block(
&self,
request: request::FinalizeBlock,
) -> Result<response::FinalizeBlock> {
let mut events = vec![];
for tx in request.txs.iter() {
let tx = String::from_utf8(tx.to_vec()).unwrap();
let (key, value) = match tx.split('=').collect::<Vec<_>>() {
k if k.len() == 1 => (k[0], k[0]),
kv => (kv[0], kv[1]),
};

atomically(|| {
self.store.update(|mut store| {
store.insert(key.into(), value.into());
store
})
})
})
.await;
.await;

info!(?key, ?value, "update");
info!(?key, ?value, "update");

Ok(response::DeliverTx {
events: vec![Event::new(
events.push(Event::new(
"app",
vec![
("key", key).index(),
("index_key", "index is working").index(),
("noindex_key", "index is working").no_index(),
],
)],
..Default::default()
));
}
Ok(response::FinalizeBlock {
events: events,
tx_results: vec![Default::default(); request.txs.len()],
validator_updates: vec![],
consensus_param_updates: None,
app_hash: Default::default(),
})
}

Expand Down Expand Up @@ -203,7 +213,7 @@ async fn main() {

// Run the ABCI server.
server
.listen(format!("{}:{}", opt.host, opt.port))
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
45 changes: 32 additions & 13 deletions fendermint/abci/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,34 @@ pub trait Application {
Ok(response::ProcessProposal::Accept)
}

/// Signals the beginning of a new block, prior to any `DeliverTx` calls.
async fn begin_block(&self, request: request::BeginBlock) -> AbciResult<response::BeginBlock> {
Ok(Default::default())
async fn extend_vote(&self, request: request::ExtendVote) -> AbciResult<response::ExtendVote> {
Ok(response::ExtendVote {
vote_extension: Default::default(),
})
}

/// Apply a transaction to the application's state.
async fn deliver_tx(&self, request: request::DeliverTx) -> AbciResult<response::DeliverTx> {
Ok(Default::default())
async fn verify_vote_extension(
&self,
request: request::VerifyVoteExtension,
) -> AbciResult<response::VerifyVoteExtension> {
if request.vote_extension.is_empty() {
Ok(response::VerifyVoteExtension::Accept)
} else {
Ok(response::VerifyVoteExtension::Reject)
}
}

/// Signals the end of a block.
async fn end_block(&self, request: request::EndBlock) -> AbciResult<response::EndBlock> {
Ok(Default::default())
async fn finalize_block(
&self,
request: request::FinalizeBlock,
) -> AbciResult<response::FinalizeBlock> {
Ok(response::FinalizeBlock {
events: Default::default(),
tx_results: Default::default(),
validator_updates: Default::default(),
consensus_param_updates: Default::default(),
app_hash: Default::default(),
})
}

/// Commit the current state at the current height.
Expand Down Expand Up @@ -174,11 +189,15 @@ where
Request::ProcessProposal(r) => {
Response::ProcessProposal(log_error(app.process_proposal(r).await)?)
}
Request::BeginBlock(r) => {
Response::BeginBlock(log_error(app.begin_block(r).await)?)
Request::FinalizeBlock(r) => {
Response::FinalizeBlock(log_error(app.finalize_block(r).await)?)
}
Request::ExtendVote(r) => {
Response::ExtendVote(log_error(app.extend_vote(r).await)?)
}
Request::VerifyVoteExtension(r) => {
Response::VerifyVoteExtension(log_error(app.verify_vote_extension(r).await)?)
}
Request::DeliverTx(r) => Response::DeliverTx(log_error(app.deliver_tx(r).await)?),
Request::EndBlock(r) => Response::EndBlock(log_error(app.end_block(r).await)?),
Request::Commit => Response::Commit(log_error(app.commit().await)?),
Request::ListSnapshots => {
Response::ListSnapshots(log_error(app.list_snapshots().await)?)
Expand Down
136 changes: 75 additions & 61 deletions fendermint/app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,13 @@ where
Ok(response::ProcessProposal::Reject)
}
}

/// Signals the beginning of a new block, prior to any `DeliverTx` calls.
async fn begin_block(&self, request: request::BeginBlock) -> AbciResult<response::BeginBlock> {
let block_height = request.header.height.into();
/// The transaction execution step of the application. Combines what used to be `BeginBlock`, `DeliverTx`, and `EndBlock`.
async fn finalize_block(
&self,
request: request::FinalizeBlock,
) -> AbciResult<response::FinalizeBlock> {
// BeginBlock
let block_height = request.height.into();
let block_hash = match request.hash {
tendermint::Hash::Sha256(h) => h,
tendermint::Hash::None => return Err(anyhow!("empty block hash").into()),
Expand All @@ -711,18 +714,17 @@ where

tracing::debug!(
height = block_height,
timestamp = request.header.time.unix_timestamp(),
app_hash = request.header.app_hash.to_string(),
//app_state_hash = to_app_hash(&state_params).to_string(), // should be the same as `app_hash`
timestamp = request.time.unix_timestamp(),
hash = request.hash.to_string(),
"begin block"
);

state_params.timestamp = to_timestamp(request.header.time);
state_params.timestamp = to_timestamp(request.time);

let state = FvmExecState::new(db, self.multi_engine.as_ref(), block_height, state_params)
.context("error creating new state")?
.with_block_hash(block_hash)
.with_validator_id(request.header.proposer_address);
.with_validator_id(request.proposer_address);

tracing::debug!("initialized exec state");

Expand All @@ -733,64 +735,58 @@ where
.await
.context("begin failed")?;

Ok(to_begin_block(ret))
}

/// Apply a transaction to the application's state.
async fn deliver_tx(&self, request: request::DeliverTx) -> AbciResult<response::DeliverTx> {
let msg = request.tx.to_vec();
let (result, block_hash) = self
.modify_exec_state(|s| async {
let ((env, state), res) = self.interpreter.deliver(s, msg).await?;
let block_hash = state.block_hash();
Ok(((env, state), (res, block_hash)))
})
.await
.context("deliver failed")?;
// [Deliver Tx]
let mut tx_results = Vec::new();
for tx in request.txs {
let msg = tx.to_vec();
let (result, block_hash) = self
.modify_exec_state(|s| async {
let ((env, state), res) = self.interpreter.deliver(s, msg).await?;
let block_hash = state.block_hash();
Ok(((env, state), (res, block_hash)))
})
.await
.context("deliver failed")?;

let response = match result {
Err(e) => invalid_exec_tx_result(AppError::InvalidEncoding, e.description),
Ok(ret) => match ret {
ChainMessageApplyRet::Signed(Err(InvalidSignature(d))) => {
invalid_exec_tx_result(AppError::InvalidSignature, d)
}
ChainMessageApplyRet::Signed(Ok(ret)) => {
to_exec_tx_result(ret.fvm, ret.domain_hash, block_hash)
}
ChainMessageApplyRet::Ipc(ret) => to_exec_tx_result(ret, None, block_hash),
},
};

let response = match result {
Err(e) => invalid_deliver_tx(AppError::InvalidEncoding, e.description),
Ok(ret) => match ret {
ChainMessageApplyRet::Signed(Err(InvalidSignature(d))) => {
invalid_deliver_tx(AppError::InvalidSignature, d)
}
ChainMessageApplyRet::Signed(Ok(ret)) => {
to_deliver_tx(ret.fvm, ret.domain_hash, block_hash)
}
ChainMessageApplyRet::Ipc(ret) => to_deliver_tx(ret, None, block_hash),
},
};
if response.code != 0.into() {
tracing::info!(
"deliver_tx failed: {:?} - {:?}",
response.code,
response.info
);
}

if response.code != 0.into() {
tracing::info!(
"deliver_tx failed: {:?} - {:?}",
response.code,
response.info
);
tx_results.push(response);
}

Ok(response)
}

/// Signals the end of a block.
async fn end_block(&self, request: request::EndBlock) -> AbciResult<response::EndBlock> {
tracing::debug!(height = request.height, "end block");
// EndBlock

// TODO: Return events from epoch transitions.
let ret = self

let power_table = self
.modify_exec_state(|s| self.interpreter.end(s))
.await
.context("end failed")?;

let r = to_end_block(ret)?;

Ok(r)
}

/// Commit the current state at the current height.
async fn commit(&self) -> AbciResult<response::Commit> {
let exec_state = self.take_exec_state().await;

// TODO: This is technically "right" but I think we actually wanna do all this stuff in `commit`.
// The "issue" is that we need to know the app_hash before `commit`. But we can't actually get that
// without calling commit on exec_state.

// Commit the execution state to the datastore.
let mut state = self.committed_state()?;
state.block_height = exec_state.block_height().try_into()?;
Expand All @@ -816,6 +812,26 @@ where
let app_hash = state.app_hash();
let block_height = state.block_height;

tracing::debug!(
block_height,
state_root = state_root.to_string(),
app_hash = app_hash.to_string(),
"finalize block state to commit"
);

// Commit app state to the datastore.
self.set_committed_state(state)?;

Ok(to_finalize_block(ret, tx_results, power_table, app_hash)
.context("finalize block failed")?)
}

/// Commit the current state at the current height.
async fn commit(&self) -> AbciResult<response::Commit> {
let state = self.committed_state()?;

let block_height = state.block_height;

// Tell CometBFT how much of the block history it can forget.
let retain_height = if self.state_hist_size == 0 {
Default::default()
Expand All @@ -825,8 +841,6 @@ where

tracing::debug!(
block_height,
state_root = state_root.to_string(),
app_hash = app_hash.to_string(),
timestamp = state.state_params.timestamp.0,
"commit state"
);
Expand Down Expand Up @@ -864,17 +878,17 @@ where
atomically(|| snapshots.notify(block_height, state.state_params.clone())).await;
}

// Commit app state to the datastore.
self.set_committed_state(state)?;

emit!(NewBlock { block_height });

// Reset check state.
let mut guard = self.check_state.lock().await;
*guard = None;

Ok(response::Commit {
data: app_hash.into(),
// Note: This used to be the app_hash which is calculated as a result of flushing the state tree.
// It has been moved into FinalizeBlock. And this field is now unused in 0.38.
// See the TODO in `finalize_block` for relevant info.
data: Default::default(),
retain_height: retain_height.try_into().expect("height is valid"),
})
}
Expand Down
Loading