diff --git a/massa-pool-worker/src/tests/operation_pool_tests.rs b/massa-pool-worker/src/tests/operation_pool_tests.rs index e0cb07d8aa0..a6042881883 100644 --- a/massa-pool-worker/src/tests/operation_pool_tests.rs +++ b/massa-pool-worker/src/tests/operation_pool_tests.rs @@ -17,29 +17,19 @@ //! latest period given his own thread. All operation which doesn't fit these //! requirements are "irrelevant" //! -use super::tools::{create_some_operations, operation_pool_test}; -use crate::operation_pool::OperationPool; -use massa_execution_exports::test_exports::{ - MockExecutionController, MockExecutionControllerMessage, -}; -use massa_models::{ - address::Address, - amount::Amount, - operation::{Operation, OperationSerializer, OperationType, SecureShareOperation}, - prehash::PreHashMap, - secure_share::SecureShareContent, - slot::Slot, -}; -use massa_pool_exports::{PoolChannels, PoolConfig}; -use massa_signature::KeyPair; -use massa_storage::Storage; -use std::{str::FromStr, time::Duration}; -use tokio::sync::broadcast; +use crate::tests::tools::OpGenerator; + +use super::tools::{create_some_operations, operation_pool_test, pool_test}; +use massa_execution_exports::test_exports::MockExecutionControllerMessage; +use massa_models::{amount::Amount, slot::Slot}; +use massa_pool_exports::PoolConfig; +use std::time::Duration; #[test] fn test_add_operation() { operation_pool_test(PoolConfig::default(), |mut operation_pool, mut storage| { - storage.store_operations(create_some_operations(10, &KeyPair::generate(), 2)); + let op_gen = OpGenerator::default().expirery(2); + storage.store_operations(create_some_operations(10, &op_gen)); operation_pool.add_operations(storage); assert_eq!(operation_pool.storage.get_op_refs().len(), 10); }); @@ -52,174 +42,164 @@ fn test_add_irrelevant_operation() { let pool_config = PoolConfig::default(); let thread_count = pool_config.thread_count; operation_pool_test(PoolConfig::default(), |mut operation_pool, mut storage| { - storage.store_operations(create_some_operations(10, &KeyPair::generate(), 1)); + let op_gen = OpGenerator::default().expirery(2); + storage.store_operations(create_some_operations(10, &op_gen)); operation_pool.notify_final_cs_periods(&vec![51; thread_count.into()]); operation_pool.add_operations(storage); assert_eq!(operation_pool.storage.get_op_refs().len(), 0); }); } -fn get_transaction(expire_period: u64, fee: u64) -> SecureShareOperation { - let sender_keypair = KeyPair::generate(); - - let recv_keypair = KeyPair::generate(); - - let op = OperationType::Transaction { - recipient_address: Address::from_public_key(&recv_keypair.get_public_key()), - amount: Amount::default(), - }; - let content = Operation { - fee: Amount::from_str(&fee.to_string()).unwrap(), - op, - expire_period, - }; - Operation::new_verifiable(content, OperationSerializer::new(), &sender_keypair).unwrap() -} - /// TODO refactor old tests #[test] fn test_pool() { - let (execution_controller, execution_receiver) = MockExecutionController::new_with_receiver(); let pool_config = PoolConfig::default(); - let storage_base = Storage::create_root(); - let operation_sender = broadcast::channel(pool_config.broadcast_operations_capacity).0; - let mut pool = OperationPool::init( + pool_test( pool_config, - &storage_base, - execution_controller, - PoolChannels { operation_sender }, - ); - // generate (id, transactions, range of validity) by threads - let mut thread_tx_lists = vec![Vec::new(); pool_config.thread_count as usize]; - for i in 0..18 { - let fee = 40 + i; - let expire_period: u64 = 40 + i; - let start_period = expire_period.saturating_sub(pool_config.operation_validity_periods); - let op = get_transaction(expire_period, fee); - let id = op.id; - - let mut ops = PreHashMap::default(); - ops.insert(id, op.clone()); - let mut storage = storage_base.clone_without_refs(); - storage.store_operations(ops.values().cloned().collect()); - pool.add_operations(storage); - //TODO: compare - // assert_eq!(storage.get_op_refs(), &Set::::default()); - - // duplicate - let mut storage = storage_base.clone_without_refs(); - storage.store_operations(ops.values().cloned().collect()); - pool.add_operations(storage); - //TODO: compare - //assert_eq!(storage.get_op_refs(), &ops.keys().copied().collect::>()); - - let op_thread = op - .content_creator_address - .get_thread(pool_config.thread_count); - thread_tx_lists[op_thread as usize].push((op, start_period..=expire_period)); - } - std::thread::spawn(move || loop { - match execution_receiver.recv_timeout(Duration::from_millis(100)) { - // forward on the operations - Ok(MockExecutionControllerMessage::UnexecutedOpsAmong { - ops, response_tx, .. - }) => { - response_tx.send(ops).unwrap(); + |mut pool_manager, mut pool, execution_receiver, storage_base| { + // generate (id, transactions, range of validity) by threads + let mut thread_tx_lists = vec![Vec::new(); pool_config.thread_count as usize]; + + for i in 0..18 { + let expire_period: u64 = 40 + i; + let op = OpGenerator::default() + .expirery(expire_period) + .fee(Amount::from_raw(40 + i)) + .generate(); //get_transaction(expire_period, fee); + + let mut storage = storage_base.clone_without_refs(); + storage.store_operations(vec![op.clone()]); + pool.add_operations(storage); + + //TODO: compare + // assert_eq!(storage.get_op_refs(), &Set::::default()); + + // duplicate + // let mut storage = storage_base.clone_without_refs(); + // storage.store_operations(vec![op.clone()]); + // pool.add_operations(storage); + //TODO: compare + //assert_eq!(storage.get_op_refs(), &ops.keys().copied().collect::>()); + + let op_thread = op + .content_creator_address + .get_thread(pool_config.thread_count); + + let start_period = + expire_period.saturating_sub(pool_config.operation_validity_periods); + + thread_tx_lists[op_thread as usize].push((op, start_period..=expire_period)); } - // we want the operations to be paid for... - Ok(MockExecutionControllerMessage::GetFinalAndCandidateBalance { - response_tx, .. - }) => response_tx - .send(vec![( - Some(Amount::from_raw(60 * 1000000000)), - Some(Amount::from_raw(60 * 1000000000)), - )]) - .unwrap(), - _ => {} - } - }); + std::thread::spawn(move || loop { + match execution_receiver.recv_timeout(Duration::from_millis(100)) { + // forward on the operations + Ok(MockExecutionControllerMessage::UnexecutedOpsAmong { + ops, + response_tx, + .. + }) => { + response_tx.send(ops).unwrap(); + } + // we want the operations to be paid for... + Ok(MockExecutionControllerMessage::GetFinalAndCandidateBalance { + response_tx, + .. + }) => response_tx + .send(vec![( + Some(Amount::from_raw(60 * 1_000_000_000)), + Some(Amount::from_raw(60 * 1_000_000_000)), + )]) + .unwrap(), + _ => {} + } + }); - // sort from bigger fee to smaller and truncate - for lst in thread_tx_lists.iter_mut() { - lst.reverse(); - lst.truncate(pool_config.max_operation_pool_size_per_thread); - } - - // checks ops are the expected ones for thread 0 and 1 and various periods - for thread in 0u8..pool_config.thread_count { - for period in 0u64..70 { - let target_slot = Slot::new(period, thread); - let max_count = 3; - let (ids, storage) = pool.get_block_operations(&target_slot); - - assert!(ids - .iter() - .map(|id| ( - *id, - storage - .read_operations() - .get(id) - .unwrap() - .serialized_data - .clone() - )) - .eq(thread_tx_lists[target_slot.thread as usize] - .iter() - .filter(|(_, r)| r.contains(&target_slot.period)) - .take(max_count) - .map(|(op, _)| (op.id, op.serialized_data.clone())))); - } - } - - // op ending before or at period 45 won't appear in the block due to incompatible validity range - // we don't keep them as expected ops - let final_period = 45u64; - pool.notify_final_cs_periods(&vec![final_period; pool_config.thread_count as usize]); - for lst in thread_tx_lists.iter_mut() { - lst.retain(|(op, _)| op.content.expire_period > final_period); - } - - // checks ops are the expected ones for thread 0 and 1 and various periods - for thread in 0u8..pool_config.thread_count { - for period in 0u64..70 { - let target_slot = Slot::new(period, thread); - let max_count = 4; - let (ids, storage) = pool.get_block_operations(&target_slot); - assert!(ids - .iter() - .map(|id| ( - *id, - storage - .read_operations() - .get(id) - .unwrap() - .serialized_data - .clone() - )) - .eq(thread_tx_lists[target_slot.thread as usize] - .iter() - .filter(|(_, r)| r.contains(&target_slot.period)) - .take(max_count) - .map(|(op, _)| (op.id, op.serialized_data.clone())))); - } - } - - // add transactions with a high fee but too much in the future: should be ignored - { - //TODO: update current slot - //pool.update_current_slot(Slot::new(10, 0)); - let fee = 1000; - let expire_period: u64 = 300; - let op = get_transaction(expire_period, fee); - let mut storage = storage_base.clone_without_refs(); - storage.store_operations(vec![op.clone()]); - pool.add_operations(storage); - //TODO: compare - //assert_eq!(storage.get_op_refs(), &Set::::default()); - let op_thread = op - .content_creator_address - .get_thread(pool_config.thread_count); - let (ids, _) = pool.get_block_operations(&Slot::new(expire_period - 1, op_thread)); - assert!(ids.is_empty()); - } + // sort from bigger fee to smaller and truncate + for lst in thread_tx_lists.iter_mut() { + lst.reverse(); + lst.truncate(pool_config.max_operation_pool_size_per_thread); + } + + // checks ops are the expected ones for thread 0 and 1 and various periods + for thread in 0u8..pool_config.thread_count { + for period in 0u64..70 { + let target_slot = Slot::new(period, thread); + let max_count = 3; + let (ids, storage) = pool.get_block_operations(&target_slot); + + assert!(ids + .iter() + .map(|id| ( + *id, + storage + .read_operations() + .get(id) + .unwrap() + .serialized_data + .clone() + )) + .eq(thread_tx_lists[target_slot.thread as usize] + .iter() + .filter(|(_, r)| r.contains(&target_slot.period)) + .take(max_count) + .map(|(op, _)| (op.id, op.serialized_data.clone())))); + } + } + + // op ending before or at period 45 won't appear in the block due to incompatible validity range + // we don't keep them as expected ops + let final_period = 45u64; + pool.notify_final_cs_periods(&vec![final_period; pool_config.thread_count as usize]); + for lst in thread_tx_lists.iter_mut() { + lst.retain(|(op, _)| op.content.expire_period > final_period); + } + + // checks ops are the expected ones for thread 0 and 1 and various periods + for thread in 0u8..pool_config.thread_count { + for period in 0u64..70 { + let target_slot = Slot::new(period, thread); + let max_count = 4; + let (ids, storage) = pool.get_block_operations(&target_slot); + assert!(ids + .iter() + .map(|id| ( + *id, + storage + .read_operations() + .get(id) + .unwrap() + .serialized_data + .clone() + )) + .eq(thread_tx_lists[target_slot.thread as usize] + .iter() + .filter(|(_, r)| r.contains(&target_slot.period)) + .take(max_count) + .map(|(op, _)| (op.id, op.serialized_data.clone())))); + } + } + + // add transactions with a high fee but too much in the future: should be ignored + { + //TODO: update current slot + //pool.update_current_slot(Slot::new(10, 0)); + let expire_period: u64 = 300; + let op = OpGenerator::default() + .expirery(expire_period) + .fee(Amount::from_raw(1000)) + .generate(); + let mut storage = storage_base.clone_without_refs(); + storage.store_operations(vec![op.clone()]); + pool.add_operations(storage); + //TODO: compare + //assert_eq!(storage.get_op_refs(), &Set::::default()); + let op_thread = op + .content_creator_address + .get_thread(pool_config.thread_count); + let (ids, _) = pool.get_block_operations(&Slot::new(expire_period - 1, op_thread)); + assert!(ids.is_empty()); + } + pool_manager.stop(); + }, + ); } diff --git a/massa-pool-worker/src/tests/scenario.rs b/massa-pool-worker/src/tests/scenario.rs index 977677f3298..6cce5962d4b 100644 --- a/massa-pool-worker/src/tests/scenario.rs +++ b/massa-pool-worker/src/tests/scenario.rs @@ -17,6 +17,7 @@ use std::time::Duration; use crate::tests::tools::create_some_operations; use crate::tests::tools::pool_test; +use crate::tests::tools::OpGenerator; use massa_execution_exports::test_exports::MockExecutionControllerMessage as ControllerMsg; use massa_models::address::Address; use massa_models::amount::Amount; @@ -50,55 +51,26 @@ fn test_simple_get_operations() { pool_test( config, |mut pool_manager, mut pool_controller, execution_receiver, mut storage| { + //setup meta-data let keypair = KeyPair::generate(); - storage.store_operations(create_some_operations(10, &keypair, 1)); - + let op_gen = OpGenerator::default().creator(keypair.clone()).expirery(1); let creator_address = Address::from_public_key(&keypair.get_public_key()); let creator_thread = creator_address.get_thread(config.thread_count); + + // setup storage + storage.store_operations(create_some_operations(10, &op_gen)); let unexecuted_ops = storage.get_op_refs().clone(); pool_controller.add_operations(storage); // Start mock execution thread. // Provides the data for `pool_controller.get_block_operations` - std::thread::spawn(move || { - match execution_receiver.recv_timeout(Duration::from_millis(100)) { - Ok(ControllerMsg::UnexecutedOpsAmong { response_tx, .. }) => { - response_tx.send(unexecuted_ops.clone()).unwrap(); - } - Ok(op) => panic!("Expected `ControllerMsg::UnexecutedOpsAmong`, got {:?}", op), - Err(_) => panic!("execution never called"), - } - match execution_receiver.recv_timeout(Duration::from_millis(100)) { - Ok(ControllerMsg::GetFinalAndCandidateBalance { - addresses, - response_tx, - .. - }) => { - assert_eq!(addresses.len(), 1); - assert_eq!(addresses[0], creator_address); - response_tx - .send(vec![(Some(Amount::from_raw(1)), Some(Amount::from_raw(1)))]) - .unwrap(); - } - Ok(op) => panic!( - "Expected `ControllerMsg::GetFinalAndCandidateBalance`, got {:?}", - op - ), - Err(_) => panic!("execution never called"), - } - - (0..9).for_each(|_| { - match execution_receiver.recv_timeout(Duration::from_millis(100)) { - Ok(ControllerMsg::UnexecutedOpsAmong { response_tx, .. }) => { - response_tx.send(unexecuted_ops.clone()).unwrap(); - } - Ok(op) => { - panic!("Expected `ControllerMsg::UnexecutedOpsAmong`, got {:?}", op) - } - Err(_) => panic!("execution never called"), - } - }) - }); + launch_basic_get_block_operation_execution_mock( + 10, + unexecuted_ops, + execution_receiver, + creator_address, + vec![(Some(Amount::from_raw(1)), Some(Amount::from_raw(1)))], + ); // This is what we are testing.... let block_operations_storage = pool_controller @@ -113,25 +85,32 @@ fn test_simple_get_operations() { } /// Launch a default mock for execution controller on call `get_block_operation` API. -fn launch_basic_get_block_operation_execution_mock( +pub fn launch_basic_get_block_operation_execution_mock( operations_len: usize, unexecuted_ops: PreHashSet, recvr: Receiver, + creator_address: Address, + balance_vec: Vec<(Option, Option)>, ) { let receive = |er: &Receiver| er.recv_timeout(Duration::from_millis(10)); std::thread::spawn(move || { - use ControllerMsg::GetFinalAndCandidateBalance as GetFinal; - use ControllerMsg::UnexecutedOpsAmong as Unexecuted; - match receive(&recvr) { - Ok(Unexecuted { response_tx, .. }) => response_tx.send(unexecuted_ops.clone()).unwrap(), + Ok(ControllerMsg::UnexecutedOpsAmong { response_tx, .. }) => { + response_tx.send(unexecuted_ops.clone()).unwrap() + } Ok(op) => panic!("Expected `ControllerMsg::UnexecutedOpsAmong`, got {:?}", op), Err(_) => panic!("execution never called"), } match receive(&recvr) { - Ok(GetFinal { response_tx, .. }) => response_tx - .send(vec![(Some(Amount::from_raw(1)), Some(Amount::from_raw(1)))]) - .unwrap(), + Ok(ControllerMsg::GetFinalAndCandidateBalance { + addresses, + response_tx, + .. + }) => { + assert_eq!(addresses.len(), 1); + assert_eq!(addresses[0], creator_address); + response_tx.send(balance_vec).unwrap(); + } Ok(op) => panic!( "Expected `ControllerMsg::GetFinalAndCandidateBalance`, got {:?}", op @@ -140,7 +119,7 @@ fn launch_basic_get_block_operation_execution_mock( } (1..operations_len).for_each(|_| { - if let Ok(Unexecuted { response_tx, .. }) = receive(&recvr) { + if let Ok(ControllerMsg::UnexecutedOpsAmong { response_tx, .. }) = receive(&recvr) { response_tx.send(unexecuted_ops.clone()).unwrap(); } }) @@ -163,12 +142,14 @@ fn launch_basic_get_block_operation_execution_mock( /// only 5 operations. #[test] fn test_get_operations_overflow() { + // setup metadata static OP_LEN: usize = 10; static MAX_OP_LEN: usize = 5; let mut max_block_size = 0; let keypair = KeyPair::generate(); let creator_address = Address::from_public_key(&keypair.get_public_key()); - let operations = create_some_operations(OP_LEN, &keypair, 1); + let op_gen = OpGenerator::default().expirery(1).creator(keypair); + let operations = create_some_operations(OP_LEN, &op_gen); operations .iter() .take(MAX_OP_LEN) @@ -181,8 +162,8 @@ fn test_get_operations_overflow() { pool_test( config, |mut pool_manager, mut pool_controller, execution_receiver, mut storage| { + // setup storage storage.store_operations(operations); - let unexecuted_ops = storage.get_op_refs().clone(); pool_controller.add_operations(storage); @@ -191,6 +172,8 @@ fn test_get_operations_overflow() { OP_LEN, unexecuted_ops, execution_receiver, + creator_address, + vec![(Some(Amount::from_raw(1)), Some(Amount::from_raw(1)))], ); let block_operations_storage = pool_controller diff --git a/massa-pool-worker/src/tests/tools.rs b/massa-pool-worker/src/tests/tools.rs index fd54fadb44a..4d6e8d9f81b 100644 --- a/massa-pool-worker/src/tests/tools.rs +++ b/massa-pool-worker/src/tests/tools.rs @@ -15,41 +15,71 @@ use massa_models::{ slot::Slot, }; use massa_pool_exports::{PoolChannels, PoolConfig, PoolController, PoolManager}; -use massa_signature::{KeyPair, PublicKey}; +use massa_signature::KeyPair; use massa_storage::Storage; -use std::str::FromStr; use std::sync::mpsc::Receiver; use tokio::sync::broadcast; -/// Tooling to create a transaction with an expire periods -/// TODO move tooling in a dedicated module -pub fn create_operation_with_expire_period( - keypair: &KeyPair, - expire_period: u64, -) -> SecureShareOperation { - let recv_keypair = KeyPair::generate(); +#[derive(Default)] +pub(crate) struct OpGenerator { + creator: Option, + receiver: Option, + fee: Option, + amount: Option, + expirery: Option, +} - let op = OperationType::Transaction { - recipient_address: Address::from_public_key(&recv_keypair.get_public_key()), - amount: Amount::default(), - }; - let content = Operation { - fee: Amount::default(), - op, - expire_period, - }; - Operation::new_verifiable(content, OperationSerializer::new(), keypair).unwrap() +impl OpGenerator { + pub(crate) fn expirery(mut self, expirery: u64) -> Self { + self.expirery = Some(expirery); + self + } + + #[allow(dead_code)] + pub(crate) fn amount(mut self, amount: Amount) -> Self { + self.amount = Some(amount); + self + } + + pub(crate) fn fee(mut self, fee: Amount) -> Self { + self.fee = Some(fee); + self + } + + #[allow(dead_code)] + pub(crate) fn receiver(mut self, receiver: KeyPair) -> Self { + self.receiver = Some(receiver); + self + } + + pub(crate) fn creator(mut self, creator: KeyPair) -> Self { + self.creator = Some(creator); + self + } + + pub(crate) fn generate(&self) -> SecureShareOperation { + let creator = self.creator.clone().unwrap_or_else(KeyPair::generate); + let receiver = self.receiver.clone().unwrap_or_else(KeyPair::generate); + let fee = self.fee.unwrap_or_default(); + let amount = self.amount.unwrap_or_default(); + let expirery = self.expirery.unwrap_or_default(); + + let op = OperationType::Transaction { + recipient_address: Address::from_public_key(&receiver.get_public_key()), + amount, + }; + let content = Operation { + fee, + op, + expire_period: expirery, + }; + Operation::new_verifiable(content, OperationSerializer::new(), &creator).unwrap() + } } /// Return `n` signed operations -pub fn create_some_operations( - n: usize, - keypair: &KeyPair, - expire_period: u64, -) -> Vec { - (0..n) - .map(|_| create_operation_with_expire_period(keypair, expire_period)) - .collect() +pub(crate) fn create_some_operations(n: usize, op_gen: &OpGenerator) -> Vec { + (0..n).map(|_| op_gen.generate()).collect() } /// Creates module mocks, providing the environment needed to run the provided closure @@ -93,21 +123,6 @@ where ) } -pub fn _get_transaction(expire_period: u64, fee: u64) -> SecureShareOperation { - let sender_keypair = KeyPair::generate(); - - let op = OperationType::Transaction { - recipient_address: Address::from_public_key(&KeyPair::generate().get_public_key()), - amount: Amount::default(), - }; - let content = Operation { - fee: Amount::from_str(&fee.to_string()).unwrap(), - op, - expire_period, - }; - Operation::new_verifiable(content, OperationSerializer::new(), &sender_keypair).unwrap() -} - /// Creates an endorsement for use in pool tests. pub fn _create_endorsement(slot: Slot) -> SecureShareEndorsement { let sender_keypair = KeyPair::generate(); @@ -119,21 +134,3 @@ pub fn _create_endorsement(slot: Slot) -> SecureShareEndorsement { }; Endorsement::new_verifiable(content, EndorsementSerializer::new(), &sender_keypair).unwrap() } - -pub fn _get_transaction_with_addresses( - expire_period: u64, - fee: u64, - sender_keypair: &KeyPair, - recv_pub: PublicKey, -) -> SecureShareOperation { - let op = OperationType::Transaction { - recipient_address: Address::from_public_key(&recv_pub), - amount: Amount::default(), - }; - let content = Operation { - fee: Amount::from_str(&fee.to_string()).unwrap(), - op, - expire_period, - }; - Operation::new_verifiable(content, OperationSerializer::new(), sender_keypair).unwrap() -}