Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use {
blockstore::{entries_to_test_shreds, Blockstore},
blockstore_processor::ProcessOptions,
leader_schedule::FixedSchedule,
leader_schedule_utils::first_of_consecutive_leader_slots,
shred::{ProcessShredsStats, ReedSolomonCache, Shred, Shredder},
use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
},
Expand Down Expand Up @@ -105,6 +106,7 @@ use {
fs,
io::Read,
iter,
net::SocketAddr,
num::NonZeroU64,
path::Path,
sync::{
Expand Down Expand Up @@ -7340,3 +7342,280 @@ fn test_alpenglow_ensure_liveness_after_second_notar_fallback_condition() {

vote_listener_thread.join().unwrap();
}

#[test]
#[serial]
fn test_alpenglow_add_missing_parent_ready() {
solana_logger::setup_with_default(AG_DEBUG_LOG_FILTER);

// Configure total stake and stake distribution
const TOTAL_STAKE: u64 = 10 * DEFAULT_NODE_STAKE;
const SLOTS_PER_EPOCH: u64 = MINIMUM_SLOTS_PER_EPOCH;

// Node stakes designed to trigger missing parent ready for Node C
let node_stakes = [
TOTAL_STAKE * 4 / 10 - 1, // Node A: 40% - ε (will go offline)
TOTAL_STAKE * 3 / 10 + 1, // Node B: 30% + ε (leader, stays online)
TOTAL_STAKE * 3 / 10, // Node C: 30% (will be partitioned)
];

assert_eq!(TOTAL_STAKE, node_stakes.iter().sum::<u64>());

// Control component for network partition simulation
let node_c_turbine_disabled = Arc::new(AtomicBool::new(false));

// Create leader schedule with Node B as primary leader (Node A will go offline)
let (leader_schedule, validator_keys) =
create_custom_leader_schedule_with_random_keys(&[0, 4, 0]);

let leader_schedule = FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
};

// Create UDP socket to listen to votes for experiment control
let vote_listener_socket = solana_net_utils::bind_to_localhost().unwrap();

// Create validator configs
let mut validator_config = ValidatorConfig::default_for_test();
let alpenglow_port_override = AlpenglowPortOverride::default();
validator_config.fixed_leader_schedule = Some(leader_schedule);
validator_config.voting_service_test_override = Some(VotingServiceOverride {
additional_listeners: vec![vote_listener_socket.local_addr().unwrap()],
alpenglow_port_override: alpenglow_port_override.clone(),
});

let mut validator_configs =
make_identical_validator_configs(&validator_config, node_stakes.len());

// Node C will have its turbine disabled during the experiment
validator_configs[2].turbine_disabled = node_c_turbine_disabled.clone();

// Cluster configuration
let mut cluster_config = ClusterConfig {
mint_lamports: TOTAL_STAKE,
node_stakes: node_stakes.to_vec(),
validator_configs,
validator_keys: Some(
validator_keys
.iter()
.cloned()
.zip(std::iter::repeat(true))
.collect(),
),
slots_per_epoch: SLOTS_PER_EPOCH,
stakers_slot_offset: SLOTS_PER_EPOCH,
ticks_per_slot: DEFAULT_TICKS_PER_SLOT,
..ClusterConfig::default()
};

// Create local cluster
let mut cluster =
LocalCluster::new_alpenglow(&mut cluster_config, SocketAddrSpace::Unspecified);

// Vote listener state management
#[derive(Debug, PartialEq, Eq)]
enum Stage {
WaitForReady,
Stability,
ClusterStuck,
ObserveLiveness,
}

impl Stage {
fn timeout(&self) -> Duration {
match self {
Stage::WaitForReady => Duration::from_secs(60),
Stage::Stability => Duration::from_secs(60),
Stage::ClusterStuck => Duration::from_secs(120),
Stage::ObserveLiveness => Duration::from_secs(180),
}
}

fn all() -> Vec<Stage> {
vec![
Stage::WaitForReady,
Stage::Stability,
Stage::ClusterStuck,
Stage::ObserveLiveness,
]
}
}

struct ExperimentState {
stage: Stage,
number_of_nodes: usize,
initial_notar_votes: HashSet<usize>,
initial_exit_slot: Option<Slot>,
stuck_at: Option<Instant>,
post_experiment_roots: HashSet<Slot>,
alpenglow_port_override: AlpenglowPortOverride,
}

impl ExperimentState {
fn new(number_of_nodes: usize, alpenglow_port_override: AlpenglowPortOverride) -> Self {
Self {
stage: Stage::WaitForReady,
number_of_nodes,
initial_notar_votes: HashSet::new(),
initial_exit_slot: None,
stuck_at: None,
post_experiment_roots: HashSet::new(),
alpenglow_port_override,
}
}

fn wait_for_nodes_ready(
&mut self,
vote: &Vote,
node_name: usize,
node_c_turbine_disabled: &Arc<AtomicBool>,
node_c_pubkey: &Pubkey,
) {
if self.stage != Stage::WaitForReady || !vote.is_notarization() {
return;
}

self.initial_notar_votes.insert(node_name);

// Wait until we have observed a notarization vote from all nodes.
if self.initial_notar_votes.len() >= self.number_of_nodes {
// Phase 1: Take Node C offline, A and B should happily finalize blocks
info!("Phase 1: Put Node C in partition. Transitioning to stability phase. current slot {}", vote.slot());
node_c_turbine_disabled.store(true, Ordering::Relaxed);
let blackhole_addr: SocketAddr = solana_net_utils::bind_to_localhost()
.unwrap()
.local_addr()
.unwrap();
let new_override = HashMap::from_iter(vec![(*node_c_pubkey, blackhole_addr)]);
self.alpenglow_port_override.update_override(new_override);
self.stage = Stage::Stability;
}
}

fn handle_experiment_start(
&mut self,
vote: &Vote,
cluster: &mut LocalCluster,
node_a_pubkey: &Pubkey,
) {
if self.stage != Stage::Stability {
return;
}

// Phase 2: Make A exit in the middle of a leader window
let slot = vote.slot();
if self.initial_exit_slot.is_none() {
if vote.is_finalize() {
let next_window_start_slot = first_of_consecutive_leader_slots(slot) + 4;
self.initial_exit_slot = Some(next_window_start_slot + 2);
info!(
"Phase 1: Node A will exit at slot {} (next window start: {}).",
self.initial_exit_slot.unwrap(),
next_window_start_slot
);
}
} else if Some(slot) >= self.initial_exit_slot && vote.is_finalize() {
// Phase 1: Take Node A offline to simulate Byzantine + offline stake
// This represents 40% - ε of total stake going offline
info!("Phase 1: Exiting Node A. B should be stuck.");
cluster.exit_node(node_a_pubkey);
self.stage = Stage::ClusterStuck;
}
}

fn handle_cluster_stuck(&mut self, node_c_turbine_disabled: &Arc<AtomicBool>) {
if self.stage == Stage::ClusterStuck {
if let Some(start) = self.stuck_at {
if start.elapsed() < Duration::from_secs(10) {
return; // wait 10 seconds for standstill to kick in
}
node_c_turbine_disabled.store(false, Ordering::Relaxed);
self.alpenglow_port_override.clear();
self.stage = Stage::ObserveLiveness;
info!("Phase 2: Node C turbine re-enabled. Transitioning to ObserveLiveness stage.");
} else {
self.stuck_at = Some(Instant::now());
info!("Phase 2 started, count 10 seconds for standstill.");
}
}
}

fn record_certificate(&mut self, slot: u64) {
self.post_experiment_roots.insert(slot);
}

fn sufficient_roots_created(&self) -> bool {
self.post_experiment_roots.len() >= 8
}
}

// Start vote listener thread to monitor and control the experiment
let vote_listener_thread = std::thread::spawn({
let mut buf = [0u8; 65_535];
let node_c_turbine_disabled = node_c_turbine_disabled.clone();
let mut experiment_state = ExperimentState::new(node_stakes.len(), alpenglow_port_override);
let timer = std::time::Instant::now();

move || {
loop {
let n_bytes = vote_listener_socket.recv(&mut buf).unwrap();

let bls_message = bincode::deserialize::<BLSMessage>(&buf[0..n_bytes]).unwrap();

match bls_message {
BLSMessage::Vote(vote_message) => {
let vote = &vote_message.vote;
let node_name = vote_message.rank as usize;

// Stage timeouts
let elapsed_time = timer.elapsed();

for stage in Stage::all() {
if elapsed_time > stage.timeout() {
panic!(
"Timeout during {:?}. node_c_turbine_disabled: {:#?}. Latest vote: {:#?}.",
stage,
node_c_turbine_disabled.load(Ordering::Acquire),
vote,
);
}
}

// Handle experiment phase transitions
experiment_state.wait_for_nodes_ready(
vote,
node_name,
&node_c_turbine_disabled,
&validator_keys[2].pubkey(),
);
experiment_state.handle_experiment_start(
vote,
&mut cluster,
&validator_keys[0].pubkey(),
);
experiment_state.handle_cluster_stuck(&node_c_turbine_disabled);
}

BLSMessage::Certificate(cert_message) => {
// Wait until the final stage before looking for finalization certificates.
if experiment_state.stage != Stage::ObserveLiveness {
continue;
}
// Observing finalization certificates to ensure liveness.
if [CertificateType::Finalize, CertificateType::FinalizeFast]
.contains(&cert_message.certificate.certificate_type())
{
experiment_state.record_certificate(cert_message.certificate.slot());

if experiment_state.sufficient_roots_created() {
break;
}
}
}
}
}
}
});

vote_listener_thread.join().unwrap();
}
Loading
Loading