-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[Runtime] Bound XCMP queue #3952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0f6c0e7
8729ffc
186d245
63bb333
3332090
b267b2b
49d70ba
b76c840
7433d3e
f088bfa
f0034ee
bae2564
2cdf1ad
6985828
bed5164
901ec1d
7fe2c67
c326770
9a82b09
723eb54
2e8566b
757ee3c
26a2c10
9aad0fd
176b4fa
2999453
c73c3f9
22d7bc0
b179452
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,15 +51,15 @@ pub mod weights; | |
| pub use weights::WeightInfo; | ||
|
|
||
| use bounded_collections::BoundedBTreeSet; | ||
| use codec::{Decode, DecodeLimit, Encode}; | ||
| use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen}; | ||
| use cumulus_primitives_core::{ | ||
| relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError, | ||
| ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource, | ||
| }; | ||
|
|
||
| use frame_support::{ | ||
| defensive, defensive_assert, | ||
| traits::{EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery}, | ||
| traits::{Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueuePausedQuery}, | ||
| weights::{Weight, WeightMeter}, | ||
| BoundedVec, | ||
| }; | ||
|
|
@@ -68,7 +68,7 @@ use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery; | |
| use polkadot_runtime_parachains::FeeTracker; | ||
| use scale_info::TypeInfo; | ||
| use sp_core::MAX_POSSIBLE_ALLOCATION; | ||
| use sp_runtime::{FixedU128, RuntimeDebug, Saturating}; | ||
| use sp_runtime::{FixedU128, RuntimeDebug, Saturating, WeakBoundedVec}; | ||
| use sp_std::prelude::*; | ||
| use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH}; | ||
| use xcm_builder::InspectMessageQueues; | ||
|
|
@@ -106,7 +106,6 @@ pub mod pallet { | |
|
|
||
| #[pallet::pallet] | ||
| #[pallet::storage_version(migration::STORAGE_VERSION)] | ||
| #[pallet::without_storage_info] | ||
| pub struct Pallet<T>(_); | ||
|
|
||
| #[pallet::config] | ||
|
|
@@ -133,6 +132,25 @@ pub mod pallet { | |
| #[pallet::constant] | ||
| type MaxInboundSuspended: Get<u32>; | ||
|
|
||
| /// Maximal number of outbound XCMP channels that can have messages queued at the same time. | ||
| /// | ||
| /// If this is reached, then no further messages can be sent to channels that do not yet | ||
| /// have a message queued. This should be set to the expected maximum of outbound channels | ||
| /// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough, | ||
| /// since otherwise the congestion control protocol will not work as intended and messages | ||
| /// may be dropped. This value increases the PoV and should therefore not be picked too | ||
| /// high. Governance needs to pay attention to not open more channels than this value. | ||
| #[pallet::constant] | ||
| type MaxActiveOutboundChannels: Get<u32>; | ||
|
|
||
| /// The maximal page size for HRMP message pages. | ||
| /// | ||
| /// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case | ||
| /// benchmarking. The limit for the size of a message is slightly below this, since some | ||
| /// overhead is incurred for encoding the format. | ||
| #[pallet::constant] | ||
| type MaxPageSize: Get<u32>; | ||
|
|
||
| /// The origin that is allowed to resume or suspend the XCMP queue. | ||
| type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>; | ||
|
|
||
|
|
@@ -277,6 +295,10 @@ pub mod pallet { | |
| AlreadySuspended, | ||
| /// The execution is already resumed. | ||
| AlreadyResumed, | ||
| /// There are too many active outbound channels. | ||
| TooManyActiveOutboundChannels, | ||
| /// The message is too big. | ||
| TooBig, | ||
| } | ||
|
|
||
| /// The suspended inbound XCMP channels. All others are not suspended. | ||
|
|
@@ -298,19 +320,28 @@ pub mod pallet { | |
| /// case of the need to send a high-priority signal message this block. | ||
| /// The bool is true if there is a signal message waiting to be sent. | ||
| #[pallet::storage] | ||
| pub(super) type OutboundXcmpStatus<T: Config> = | ||
| StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>; | ||
| pub(super) type OutboundXcmpStatus<T: Config> = StorageValue< | ||
| _, | ||
| BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>, | ||
| ValueQuery, | ||
| >; | ||
|
|
||
| // The new way of doing it: | ||
| /// The messages outbound in a given XCMP channel. | ||
| #[pallet::storage] | ||
| pub(super) type OutboundXcmpMessages<T: Config> = | ||
| StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>; | ||
| pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap< | ||
| _, | ||
| Blake2_128Concat, | ||
| ParaId, | ||
| Twox64Concat, | ||
| u16, | ||
| WeakBoundedVec<u8, T::MaxPageSize>, | ||
ggwpez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ValueQuery, | ||
| >; | ||
|
|
||
| /// Any signal messages waiting to be sent. | ||
| #[pallet::storage] | ||
| pub(super) type SignalMessages<T: Config> = | ||
| StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>; | ||
| StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>; | ||
ggwpez marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea true. In the past, there was the possibility for multiple signals to be enqueued, but i removed that, since later signals completely overwrite any previous ones. |
||
|
|
||
| /// The configuration which controls the dynamics of the outbound queue. | ||
| #[pallet::storage] | ||
|
|
@@ -332,15 +363,14 @@ pub mod pallet { | |
| StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>; | ||
| } | ||
|
|
||
| #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)] | ||
| #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)] | ||
| pub enum OutboundState { | ||
| Ok, | ||
| Suspended, | ||
| } | ||
|
|
||
| /// Struct containing detailed information about the outbound channel. | ||
| #[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)] | ||
| #[cfg_attr(feature = "std", derive(Debug))] | ||
| #[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)] | ||
| pub struct OutboundChannelDetails { | ||
| /// The `ParaId` of the parachain that this channel is connected with. | ||
| recipient: ParaId, | ||
|
|
@@ -376,7 +406,7 @@ impl OutboundChannelDetails { | |
| } | ||
| } | ||
|
|
||
| #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)] | ||
| #[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)] | ||
| pub struct QueueConfigData { | ||
| /// The number of pages which must be in the queue for the other side to be told to suspend | ||
| /// their sending. | ||
|
|
@@ -479,7 +509,10 @@ impl<T: Config> Pallet<T> { | |
| { | ||
| details | ||
| } else { | ||
| all_channels.push(OutboundChannelDetails::new(recipient)); | ||
| all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| { | ||
| log::error!("Failed to activate HRMP channel: {:?}", e); | ||
| MessageSendError::TooManyChannels | ||
| })?; | ||
| all_channels | ||
| .last_mut() | ||
| .expect("can't be empty; a new element was just pushed; qed") | ||
|
|
@@ -504,7 +537,9 @@ impl<T: Config> Pallet<T> { | |
| if page.len() + encoded_fragment.len() > max_message_size { | ||
| return None | ||
| } | ||
| page.extend_from_slice(&encoded_fragment[..]); | ||
| for frag in encoded_fragment.iter() { | ||
| page.try_push(*frag).ok()?; | ||
ggwpez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| Some(page.len()) | ||
| }, | ||
| ) | ||
|
|
@@ -522,7 +557,10 @@ impl<T: Config> Pallet<T> { | |
| new_page.extend_from_slice(&encoded_fragment[..]); | ||
| let last_page_size = new_page.len(); | ||
| let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32; | ||
| <OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page); | ||
| let bounded_page = BoundedVec::<u8, T::MaxPageSize>::try_from(new_page) | ||
| .map_err(|_| MessageSendError::TooBig)?; | ||
| let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None); | ||
| <OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page); | ||
| <OutboundXcmpStatus<T>>::put(all_channels); | ||
| (number_of_pages, last_page_size) | ||
| }; | ||
|
|
@@ -544,17 +582,24 @@ impl<T: Config> Pallet<T> { | |
|
|
||
| /// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this | ||
| /// block. | ||
| fn send_signal(dest: ParaId, signal: ChannelSignal) { | ||
| fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> { | ||
| let mut s = <OutboundXcmpStatus<T>>::get(); | ||
| if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) { | ||
| details.signals_exist = true; | ||
| } else { | ||
| s.push(OutboundChannelDetails::new(dest).with_signals()); | ||
| s.try_push(OutboundChannelDetails::new(dest).with_signals()) | ||
| .map_err(|_| Error::<T>::TooManyActiveOutboundChannels)?; | ||
| } | ||
| <SignalMessages<T>>::mutate(dest, |page| { | ||
| *page = (XcmpMessageFormat::Signals, signal).encode(); | ||
| }); | ||
|
|
||
| let page = BoundedVec::<u8, T::MaxPageSize>::try_from( | ||
| (XcmpMessageFormat::Signals, signal).encode(), | ||
| ) | ||
| .map_err(|_| Error::<T>::TooBig)?; | ||
| let page = WeakBoundedVec::force_from(page.into_inner(), None); | ||
|
|
||
| <SignalMessages<T>>::insert(dest, page); | ||
| <OutboundXcmpStatus<T>>::put(s); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn suspend_channel(target: ParaId) { | ||
|
|
@@ -564,7 +609,9 @@ impl<T: Config> Pallet<T> { | |
| defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok."); | ||
| details.state = OutboundState::Suspended; | ||
| } else { | ||
| s.push(OutboundChannelDetails::new(target).with_suspended_state()); | ||
| if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() { | ||
| defensive!("Cannot pause channel; too many outbound channels"); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
@@ -665,18 +712,25 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> { | |
| let suspended = suspended_channels.contains(¶); | ||
|
|
||
| if suspended && fp.ready_pages <= resume_threshold { | ||
| Self::send_signal(para, ChannelSignal::Resume); | ||
|
|
||
| suspended_channels.remove(¶); | ||
| <InboundXcmpSuspended<T>>::put(suspended_channels); | ||
| if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) { | ||
| log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err); | ||
| } else { | ||
| suspended_channels.remove(¶); | ||
| <InboundXcmpSuspended<T>>::put(suspended_channels); | ||
| } | ||
| } else if !suspended && fp.ready_pages >= suspend_threshold { | ||
| log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para); | ||
| Self::send_signal(para, ChannelSignal::Suspend); | ||
|
|
||
| if let Err(err) = suspended_channels.try_insert(para) { | ||
| if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) { | ||
| // It will retry if `drop_threshold` is not reached, but it could be too late. | ||
| log::error!( | ||
| "defensive: Could not send suspension signal; future messages may be dropped: {:?}", err | ||
| ); | ||
| } else if let Err(err) = suspended_channels.try_insert(para) { | ||
| log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err); | ||
| } else { | ||
| <InboundXcmpSuspended<T>>::put(suspended_channels); | ||
| } | ||
| <InboundXcmpSuspended<T>>::put(suspended_channels); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -843,7 +897,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> { | |
| // since it's so unlikely then for now we just drop it. | ||
| defensive!("WARNING: oversize message in queue - dropping"); | ||
| } else { | ||
| result.push((para_id, page)); | ||
| result.push((para_id, page.into_inner())); | ||
| } | ||
|
|
||
| let max_total_size = match T::ChannelInfo::get_channel_info(para_id) { | ||
|
|
@@ -891,7 +945,9 @@ impl<T: Config> XcmpMessageSource for Pallet<T> { | |
| let pruned = old_statuses_len - statuses.len(); | ||
| // removing an item from status implies a message being sent, so the result messages must | ||
| // be no less than the pruned channels. | ||
| statuses.rotate_left(result.len().saturating_sub(pruned)); | ||
| let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof( | ||
| "Could not store HRMP channels config. Some HRMP channels may be broken.", | ||
| ); | ||
|
|
||
| <OutboundXcmpStatus<T>>::put(statuses); | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.