Skip to content

Commit c8c1ca8

Browse files
authored
raftstore: Observe when receive raft message (tikv#14043)
ref tikv#13855 Introduce observers when receive raft message. Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
1 parent 3791560 commit c8c1ca8

8 files changed

Lines changed: 70 additions & 6 deletions

File tree

components/cdc/src/observer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ mod tests {
273273
prev_lead_transferee: raft::INVALID_ID,
274274
vote: raft::INVALID_ID,
275275
initialized: true,
276+
peer_id: raft::INVALID_ID,
276277
},
277278
);
278279
match rx.recv_timeout(Duration::from_millis(10)).unwrap().unwrap() {
@@ -301,6 +302,7 @@ mod tests {
301302
prev_lead_transferee: 3,
302303
vote: 3,
303304
initialized: true,
305+
peer_id: raft::INVALID_ID,
304306
},
305307
);
306308
match rx.recv_timeout(Duration::from_millis(10)).unwrap().unwrap() {

components/raftstore-v2/src/operation/ready/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
768768
prev_lead_transferee: target,
769769
vote: self.raft_group().raft.vote,
770770
initialized: self.storage().is_initialized(),
771+
peer_id: self.peer().get_id(),
771772
},
772773
);
773774
self.proposal_control_mut().maybe_update_term(term);

components/raftstore/src/coprocessor/dispatcher.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use kvproto::{
88
metapb::{Region, RegionEpoch},
99
pdpb::CheckPolicy,
1010
raft_cmdpb::{ComputeHashRequest, RaftCmdRequest},
11+
raft_serverpb::RaftMessage,
1112
};
1213
use protobuf::Message;
1314
use raft::eraftpb;
@@ -278,6 +279,7 @@ impl_box_observer_g!(
278279
ConsistencyCheckObserver,
279280
WrappedConsistencyCheckObserver
280281
);
282+
impl_box_observer!(BoxMessageObserver, MessageObserver, WrappedMessageObserver);
281283

282284
/// Registry contains all registered coprocessors.
283285
#[derive(Clone)]
@@ -296,6 +298,7 @@ where
296298
read_index_observers: Vec<Entry<BoxReadIndexObserver>>,
297299
pd_task_observers: Vec<Entry<BoxPdTaskObserver>>,
298300
update_safe_ts_observers: Vec<Entry<BoxUpdateSafeTsObserver>>,
301+
message_observers: Vec<Entry<BoxMessageObserver>>,
299302
// TODO: add endpoint
300303
}
301304

@@ -313,6 +316,7 @@ impl<E: KvEngine> Default for Registry<E> {
313316
read_index_observers: Default::default(),
314317
pd_task_observers: Default::default(),
315318
update_safe_ts_observers: Default::default(),
319+
message_observers: Default::default(),
316320
}
317321
}
318322
}
@@ -381,6 +385,10 @@ impl<E: KvEngine> Registry<E> {
381385
pub fn register_update_safe_ts_observer(&mut self, priority: u32, qo: BoxUpdateSafeTsObserver) {
382386
push!(priority, qo, self.update_safe_ts_observers);
383387
}
388+
389+
pub fn register_message_observer(&mut self, priority: u32, qo: BoxMessageObserver) {
390+
push!(priority, qo, self.message_observers);
391+
}
384392
}
385393

386394
/// A macro that loops over all observers and returns early when error is found
@@ -780,6 +788,17 @@ impl<E: KvEngine> CoprocessorHost<E> {
780788
true
781789
}
782790

791+
/// Returns false if the message should not be stepped later.
792+
pub fn on_raft_message(&self, msg: &RaftMessage) -> bool {
793+
for observer in &self.registry.message_observers {
794+
let observer = observer.observer.inner();
795+
if !observer.on_raft_message(msg) {
796+
return false;
797+
}
798+
}
799+
true
800+
}
801+
783802
pub fn on_flush_applied_cmd_batch(
784803
&self,
785804
max_level: ObserveLevel,
@@ -890,6 +909,7 @@ mod tests {
890909
OnUpdateSafeTs = 23,
891910
PrePersist = 24,
892911
PreWriteApplyState = 25,
912+
OnRaftMessage = 26,
893913
}
894914

895915
impl Coprocessor for TestCoprocessor {}
@@ -1132,6 +1152,14 @@ mod tests {
11321152
}
11331153
}
11341154

1155+
impl MessageObserver for TestCoprocessor {
1156+
fn on_raft_message(&self, _: &RaftMessage) -> bool {
1157+
self.called
1158+
.fetch_add(ObserverIndex::OnRaftMessage as usize, Ordering::SeqCst);
1159+
true
1160+
}
1161+
}
1162+
11351163
macro_rules! assert_all {
11361164
($target:expr, $expect:expr) => {{
11371165
for (c, e) in ($target).iter().zip($expect) {
@@ -1168,6 +1196,8 @@ mod tests {
11681196
.register_cmd_observer(1, BoxCmdObserver::new(ob.clone()));
11691197
host.registry
11701198
.register_update_safe_ts_observer(1, BoxUpdateSafeTsObserver::new(ob.clone()));
1199+
host.registry
1200+
.register_message_observer(1, BoxMessageObserver::new(ob.clone()));
11711201

11721202
let mut index: usize = 0;
11731203
let region = Region::default();
@@ -1282,6 +1312,11 @@ mod tests {
12821312
host.pre_write_apply_state(&region);
12831313
index += ObserverIndex::PreWriteApplyState as usize;
12841314
assert_all!([&ob.called], &[index]);
1315+
1316+
let msg = RaftMessage::default();
1317+
host.on_raft_message(&msg);
1318+
index += ObserverIndex::OnRaftMessage as usize;
1319+
assert_all!([&ob.called], &[index]);
12851320
}
12861321

12871322
#[test]

components/raftstore/src/coprocessor/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ mod metrics;
2626
pub mod region_info_accessor;
2727
mod split_check;
2828
pub mod split_observer;
29+
use kvproto::raft_serverpb::RaftMessage;
2930

3031
pub use self::{
3132
config::{Config, ConsistencyCheckMethod},
3233
consistency_check::{ConsistencyCheckObserver, Raw as RawConsistencyCheckObserver},
3334
dispatcher::{
3435
BoxAdminObserver, BoxApplySnapshotObserver, BoxCmdObserver, BoxConsistencyCheckObserver,
35-
BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver, BoxRoleObserver,
36-
BoxSplitCheckObserver, BoxUpdateSafeTsObserver, CoprocessorHost, Registry, StoreHandle,
36+
BoxMessageObserver, BoxPdTaskObserver, BoxQueryObserver, BoxRegionChangeObserver,
37+
BoxRoleObserver, BoxSplitCheckObserver, BoxUpdateSafeTsObserver, CoprocessorHost, Registry,
38+
StoreHandle,
3739
},
3840
error::{Error, Result},
3941
region_info_accessor::{
@@ -269,6 +271,7 @@ pub struct RoleChange {
269271
/// Which peer is voted by itself.
270272
pub vote: u64,
271273
pub initialized: bool,
274+
pub peer_id: u64,
272275
}
273276

274277
impl RoleChange {
@@ -280,6 +283,7 @@ impl RoleChange {
280283
prev_lead_transferee: raft::INVALID_ID,
281284
vote: raft::INVALID_ID,
282285
initialized: true,
286+
peer_id: raft::INVALID_ID,
283287
}
284288
}
285289
}
@@ -334,6 +338,13 @@ pub trait RegionChangeObserver: Coprocessor {
334338
}
335339
}
336340

341+
pub trait MessageObserver: Coprocessor {
342+
/// Returns false if the message should not be stepped later.
343+
fn on_raft_message(&self, _: &RaftMessage) -> bool {
344+
true
345+
}
346+
}
347+
337348
#[derive(Clone, Debug, Default)]
338349
pub struct Cmd {
339350
pub index: u64,

components/raftstore/src/store/fsm/peer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,9 @@ where
610610
for m in msgs.drain(..) {
611611
match m {
612612
PeerMsg::RaftMessage(msg) => {
613+
if !self.ctx.coprocessor_host.on_raft_message(&msg.msg) {
614+
continue;
615+
}
613616
if let Err(e) = self.on_raft_message(msg) {
614617
error!(%e;
615618
"handle raft message err";

components/raftstore/src/store/fsm/store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,9 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
753753
match m {
754754
StoreMsg::Tick(tick) => self.on_tick(tick),
755755
StoreMsg::RaftMessage(msg) => {
756+
if !self.ctx.coprocessor_host.on_raft_message(&msg.msg) {
757+
continue;
758+
}
756759
if let Err(e) = self.on_raft_message(msg) {
757760
if matches!(&e, Error::RegionNotRegistered { .. }) {
758761
// This may happen in normal cases when add-peer runs slowly

components/raftstore/src/store/peer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,6 +2335,7 @@ where
23352335
prev_lead_transferee: self.lead_transferee,
23362336
vote: self.raft_group.raft.vote,
23372337
initialized: self.is_initialized(),
2338+
peer_id: self.peer.get_id(),
23382339
},
23392340
);
23402341
self.cmd_epoch_checker.maybe_update_term(self.term());

components/raftstore/src/store/snap.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,9 @@ fn retry_delete_snapshot(mgr: &SnapManagerCore, key: &SnapKey, snap: &Snapshot)
207207
false
208208
}
209209

210-
fn gen_snapshot_meta(cf_files: &[CfFile], for_balance: bool) -> RaftStoreResult<SnapshotMeta> {
210+
// Create a SnapshotMeta that can be later put into RaftSnapshotData or written
211+
// into file.
212+
pub fn gen_snapshot_meta(cf_files: &[CfFile], for_balance: bool) -> RaftStoreResult<SnapshotMeta> {
211213
let mut meta = Vec::with_capacity(cf_files.len());
212214
for cf_file in cf_files {
213215
if !SNAPSHOT_CFS.iter().any(|cf| cf_file.cf == *cf) {
@@ -663,7 +665,8 @@ impl Snapshot {
663665
Ok(snapshot_meta)
664666
}
665667

666-
fn set_snapshot_meta(&mut self, snapshot_meta: SnapshotMeta) -> RaftStoreResult<()> {
668+
// Validate and set SnapshotMeta of this Snapshot.
669+
pub fn set_snapshot_meta(&mut self, snapshot_meta: SnapshotMeta) -> RaftStoreResult<()> {
667670
let mut cf_file_count_from_meta: Vec<usize> = vec![];
668671
let mut file_count = 0;
669672
let mut current_cf = "";
@@ -812,8 +815,9 @@ impl Snapshot {
812815
}
813816
}
814817

815-
// Only called in `do_build`.
816-
fn save_meta_file(&mut self) -> RaftStoreResult<()> {
818+
// Save `SnapshotMeta` to file.
819+
// Used in `do_build` and by external crates.
820+
pub fn save_meta_file(&mut self) -> RaftStoreResult<()> {
817821
let v = box_try!(self.meta_file.meta.as_ref().unwrap().write_to_bytes());
818822
if let Some(mut f) = self.meta_file.file.take() {
819823
// `meta_file` could be None for this case: in `init_for_building` the snapshot
@@ -1125,6 +1129,10 @@ impl Snapshot {
11251129
file_system::metadata(&self.meta_file.path)
11261130
}
11271131

1132+
pub fn meta_path(&self) -> &PathBuf {
1133+
&self.meta_file.path
1134+
}
1135+
11281136
pub fn total_size(&self) -> u64 {
11291137
self.cf_files
11301138
.iter()

0 commit comments

Comments
 (0)