From 9526f040b487aa8642ba21450cddfbbcace9e1f2 Mon Sep 17 00:00:00 2001 From: Oliver Tale-Yazdi Date: Sat, 25 Feb 2023 17:13:20 +0100 Subject: [PATCH] Yieldable queues for pallet `MessageQueue` (#13424) * Add Yield message processing error Signed-off-by: Oliver Tale-Yazdi * Add NoopServiceQueues Signed-off-by: Oliver Tale-Yazdi * Implement temporary error aka Yield Signed-off-by: Oliver Tale-Yazdi * Make NoopMessageProcessor generic Signed-off-by: Oliver Tale-Yazdi * Mock pausable message processor Signed-off-by: Oliver Tale-Yazdi * Test paused queues Signed-off-by: Oliver Tale-Yazdi * Integration test paused queues Signed-off-by: Oliver Tale-Yazdi * Use WeightMeter instead of weight return Signed-off-by: Oliver Tale-Yazdi * fix Signed-off-by: Oliver Tale-Yazdi * Make compile Signed-off-by: Oliver Tale-Yazdi * Add tests Signed-off-by: Oliver Tale-Yazdi * ".git/.scripts/commands/bench/bench.sh" pallet dev pallet_message_queue * Fix test Signed-off-by: Oliver Tale-Yazdi --------- Signed-off-by: Oliver Tale-Yazdi Co-authored-by: command-bot <> --- bin/node/runtime/src/lib.rs | 2 +- frame/message-queue/src/integration_test.rs | 129 ++++++++++- frame/message-queue/src/lib.rs | 55 +++-- frame/message-queue/src/mock.rs | 49 ++-- frame/message-queue/src/mock_helpers.rs | 26 ++- frame/message-queue/src/tests.rs | 119 +++++++++- frame/message-queue/src/weights.rs | 241 +++++++++++--------- frame/support/src/traits.rs | 4 +- frame/support/src/traits/messages.rs | 27 ++- 9 files changed, 479 insertions(+), 173 deletions(-) diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs index a869d309fa479..14b74425f094b 100644 --- a/bin/node/runtime/src/lib.rs +++ b/bin/node/runtime/src/lib.rs @@ -1155,7 +1155,7 @@ impl pallet_message_queue::Config for Runtime { type RuntimeEvent = RuntimeEvent; type WeightInfo = (); /// NOTE: Always set this to `NoopMessageProcessor` for benchmarking. - type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor; + type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor; type Size = u32; type QueueChangeHandler = (); type HeapSize = ConstU32<{ 64 * 1024 }>; diff --git a/frame/message-queue/src/integration_test.rs b/frame/message-queue/src/integration_test.rs index b89c7808613dd..255098b3b1415 100644 --- a/frame/message-queue/src/integration_test.rs +++ b/frame/message-queue/src/integration_test.rs @@ -23,7 +23,9 @@ use crate::{ mock::{ new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed, + SuspendedQueues, }, + mock_helpers::MessageOrigin, *, }; @@ -39,6 +41,7 @@ use sp_runtime::{ testing::Header, traits::{BlakeTwo256, IdentityLookup}, }; +use std::collections::{BTreeMap, BTreeSet}; type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; type Block = frame_system::mocking::MockBlock; @@ -100,7 +103,8 @@ impl Config for Test { /// Simulates heavy usage by enqueueing and processing large amounts of messages. /// -/// Best to run with `-r`, `RUST_LOG=info` and `RUSTFLAGS='-Cdebug-assertions=y'`. +/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p +/// pallet-message-queue -- --ignored`. /// /// # Example output /// @@ -130,29 +134,131 @@ fn stress_test_enqueue_and_service() { let mut msgs_remaining = 0; for _ in 0..blocks { // Start by enqueuing a large number of messages. - let (enqueued, _) = + let enqueued = enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng); msgs_remaining += enqueued; // Pick a fraction of all messages currently in queue and process them. let processed = rng.gen_range(1..=msgs_remaining); log::info!("Processing {} of all messages {}", processed, msgs_remaining); - process_messages(processed); // This also advances the block. + process_some_messages(processed); // This also advances the block. msgs_remaining -= processed; } log::info!("Processing all remaining {} messages", msgs_remaining); - process_messages(msgs_remaining); + process_all_messages(msgs_remaining); post_conditions(); }); } +/// Simulates heavy usage of the suspension logic via `Yield`. +/// +/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p +/// pallet-message-queue -- --ignored`. +/// +/// # Example output +/// +/// ```pre +/// Enqueued 11776 messages across 2526 queues. Payload 173.94 KiB +/// Suspended 63 and resumed 7 queues of 2526 in total +/// Processing 593 messages. Resumed msgs: 11599, All msgs: 11776 +/// Enqueued 30104 messages across 5533 queues. Payload 416.62 KiB +/// Suspended 24 and resumed 15 queues of 5533 in total +/// Processing 12841 messages. Resumed msgs: 40857, All msgs: 41287 +/// Processing all 28016 remaining resumed messages +/// Resumed all 64 suspended queues +/// Processing all remaining 430 messages +/// ``` +#[test] +#[ignore] // Only run in the CI. +fn stress_test_queue_suspension() { + let blocks = 20; + let max_queues = 10_000; + let max_messages_per_queue = 10_000; + let (max_suspend_per_block, max_resume_per_block) = (100, 50); + let max_msg_len = MaxMessageLenOf::::get(); + let mut rng = StdRng::seed_from_u64(41); + + new_test_ext::().execute_with(|| { + let mut suspended = BTreeSet::::new(); + let mut msgs_remaining = 0; + + for _ in 0..blocks { + // Start by enqueuing a large number of messages. + let enqueued = + enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng); + msgs_remaining += enqueued; + let per_queue = msgs_per_queue(); + + // Suspend a random subset of queues. + let to_suspend = rng.gen_range(0..max_suspend_per_block).min(per_queue.len()); + for _ in 0..to_suspend { + let q = rng.gen_range(0..per_queue.len()); + suspended.insert(*per_queue.iter().nth(q).map(|(q, _)| q).unwrap()); + } + // Resume a random subst of suspended queues. + let to_resume = rng.gen_range(0..max_resume_per_block).min(suspended.len()); + for _ in 0..to_resume { + let q = rng.gen_range(0..suspended.len()); + suspended.remove(&suspended.iter().nth(q).unwrap().clone()); + } + log::info!( + "Suspended {} and resumed {} queues of {} in total", + to_suspend, + to_resume, + per_queue.len() + ); + SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect()); + + // Pick a fraction of all messages currently in queue and process them. + let resumed_messages = + per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum(); + let processed = rng.gen_range(1..=resumed_messages); + log::info!( + "Processing {} messages. Resumed msgs: {}, All msgs: {}", + processed, + resumed_messages, + msgs_remaining + ); + process_some_messages(processed); // This also advances the block. + msgs_remaining -= processed; + } + let per_queue = msgs_per_queue(); + let resumed_messages = + per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum(); + log::info!("Processing all {} remaining resumed messages", resumed_messages); + process_all_messages(resumed_messages); + msgs_remaining -= resumed_messages; + + let resumed = SuspendedQueues::take(); + log::info!("Resumed all {} suspended queues", resumed.len()); + log::info!("Processing all remaining {} messages", msgs_remaining); + process_all_messages(msgs_remaining); + post_conditions(); + }); +} + +/// How many messages are in each queue. +fn msgs_per_queue() -> BTreeMap { + let mut per_queue = BTreeMap::new(); + for (o, q) in BookStateFor::::iter() { + let MessageOrigin::Everywhere(o) = o else { + unreachable!(); + }; + per_queue.insert(o, q.message_count as u32); + } + per_queue +} + /// Enqueue a random number of random messages into a random number of queues. +/// +/// Returns the total number of enqueued messages, their combined length and the number of messages +/// per queue. fn enqueue_messages( max_queues: u32, max_per_queue: u32, max_msg_len: u32, rng: &mut StdRng, -) -> (u32, usize) { +) -> u32 { let num_queues = rng.gen_range(1..max_queues); let mut num_messages = 0; let mut total_msg_len = 0; @@ -179,11 +285,11 @@ fn enqueue_messages( num_queues, total_msg_len as f64 / 1024.0 ); - (num_messages, total_msg_len as usize) + num_messages } /// Process the number of messages. -fn process_messages(num_msgs: u32) { +fn process_some_messages(num_msgs: u32) { let weight = (num_msgs as u64).into_weight(); ServiceWeight::set(Some(weight)); let consumed = next_block(); @@ -192,6 +298,15 @@ fn process_messages(num_msgs: u32) { assert_eq!(NumMessagesProcessed::take(), num_msgs as usize); } +/// Process all remaining messages and assert their number. +fn process_all_messages(expected: u32) { + ServiceWeight::set(Some(Weight::MAX)); + let consumed = next_block(); + + assert_eq!(consumed, Weight::from_all(expected as u64)); + assert_eq!(NumMessagesProcessed::take(), expected as usize); +} + /// Returns the weight consumed by `MessageQueue::on_initialize()`. fn next_block() -> Weight { MessageQueue::on_finalize(System::block_number()); diff --git a/frame/message-queue/src/lib.rs b/frame/message-queue/src/lib.rs index 2b64e3fc9b174..6c264be3c834e 100644 --- a/frame/message-queue/src/lib.rs +++ b/frame/message-queue/src/lib.rs @@ -533,6 +533,11 @@ pub mod pallet { Queued, /// There is temporarily not enough weight to continue servicing messages. InsufficientWeight, + /// This message is temporarily unprocessable. + /// + /// Such errors are expected, but not guaranteed, to resolve themselves eventually through + /// retrying. + TemporarilyUnprocessable, } /// The index of the first and last (non-empty) pages. @@ -588,6 +593,9 @@ pub mod pallet { /// Execute an overweight message. /// + /// Temporary processing errors will be propagated whereas permanent errors are treated + /// as success condition. + /// /// - `origin`: Must be `Signed`. /// - `message_origin`: The origin from which the message to be executed arrived. /// - `page`: The page in the queue in which the message to be executed is sitting. @@ -621,6 +629,10 @@ pub mod pallet { enum PageExecutionStatus { /// The execution bailed because there was not enough weight remaining. Bailed, + /// The page did not make any progress on its execution. + /// + /// This is a transient condition and can be handled by retrying - exactly like [Bailed]. + NoProgress, /// No more messages could be loaded. This does _not_ imply `page.is_complete()`. /// /// The reasons for this status are: @@ -634,6 +646,10 @@ enum PageExecutionStatus { enum ItemExecutionStatus { /// The execution bailed because there was not enough weight remaining. Bailed, + /// The item did not make any progress on its execution. + /// + /// This is a transient condition and can be handled by retrying - exactly like [Bailed]. + NoProgress, /// The item was not found. NoItem, /// Whether the execution of an item resulted in it being processed. @@ -651,8 +667,8 @@ enum MessageExecutionStatus { Overweight, /// The message was processed successfully. Processed, - /// The message was processed and resulted in a permanent error. - Unprocessable, + /// The message was processed and resulted in a, possibly permanent, error. + Unprocessable { permanent: bool }, } impl Pallet { @@ -814,7 +830,8 @@ impl Pallet { // additional overweight event being deposited. ) { Overweight | InsufficientWeight => Err(Error::::InsufficientWeight), - Unprocessable | Processed => { + Unprocessable { permanent: false } => Err(Error::::TemporarilyUnprocessable), + Unprocessable { permanent: true } | Processed => { page.note_processed_at_pos(pos); book_state.message_count.saturating_dec(); book_state.size.saturating_reduce(payload_len); @@ -921,6 +938,7 @@ impl Pallet { weight: &mut WeightMeter, overweight_limit: Weight, ) -> (bool, Option>) { + use PageExecutionStatus::*; if !weight.check_accrue( T::WeightInfo::service_queue_base().saturating_add(T::WeightInfo::ready_ring_unknit()), ) { @@ -936,9 +954,9 @@ impl Pallet { total_processed.saturating_accrue(processed); match status { // Store the page progress and do not go to the next one. - PageExecutionStatus::Bailed => break, + Bailed | NoProgress => break, // Go to the next page if this one is at the end. - PageExecutionStatus::NoMore => (), + NoMore => (), }; book_state.begin.saturating_inc(); } @@ -1003,6 +1021,7 @@ impl Pallet { ) { Bailed => break PageExecutionStatus::Bailed, NoItem => break PageExecutionStatus::NoMore, + NoProgress => break PageExecutionStatus::NoProgress, // Keep going as long as we make progress... Executed(true) => total_processed.saturating_inc(), Executed(false) => (), @@ -1053,7 +1072,8 @@ impl Pallet { overweight_limit, ) { InsufficientWeight => return ItemExecutionStatus::Bailed, - Processed | Unprocessable => true, + Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress, + Processed | Unprocessable { permanent: true } => true, Overweight => false, }; @@ -1125,12 +1145,14 @@ impl Pallet { page_index: PageIndex, message_index: T::Size, message: &[u8], - weight: &mut WeightMeter, + meter: &mut WeightMeter, overweight_limit: Weight, ) -> MessageExecutionStatus { let hash = T::Hashing::hash(message); - use ProcessMessageError::Overweight; - match T::MessageProcessor::process_message(message, origin.clone(), weight.remaining()) { + use ProcessMessageError::*; + let prev_consumed = meter.consumed; + + match T::MessageProcessor::process_message(message, origin.clone(), meter) { Err(Overweight(w)) if w.any_gt(overweight_limit) => { // Permanently overweight. Self::deposit_event(Event::::OverweightEnqueued { @@ -1146,16 +1168,19 @@ impl Pallet { // queue. MessageExecutionStatus::InsufficientWeight }, - Err(error) => { + Err(Yield) => { + // Processing should be reattempted later. + MessageExecutionStatus::Unprocessable { permanent: false } + }, + Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => { // Permanent error - drop Self::deposit_event(Event::::ProcessingFailed { hash, origin, error }); - MessageExecutionStatus::Unprocessable + MessageExecutionStatus::Unprocessable { permanent: true } }, - Ok((success, weight_used)) => { + Ok(success) => { // Success - weight.defensive_saturating_accrue(weight_used); - let event = Event::::Processed { hash, origin, weight_used, success }; - Self::deposit_event(event); + let weight_used = meter.consumed.saturating_sub(prev_consumed); + Self::deposit_event(Event::::Processed { hash, origin, weight_used, success }); MessageExecutionStatus::Processed }, } diff --git a/frame/message-queue/src/mock.rs b/frame/message-queue/src/mock.rs index 8817ebc3531ed..28a599bcf83c6 100644 --- a/frame/message-queue/src/mock.rs +++ b/frame/message-queue/src/mock.rs @@ -154,6 +154,7 @@ impl crate::weights::WeightInfo for MockedWeightInfo { parameter_types! { pub static MessagesProcessed: Vec<(Vec, MessageOrigin)> = vec![]; + pub static SuspendedQueues: Vec = vec![]; } /// A message processor which records all processed messages into [`MessagesProcessed`]. @@ -170,9 +171,9 @@ impl ProcessMessage for RecordingMessageProcessor { fn process_message( message: &[u8], origin: Self::Origin, - weight_limit: Weight, - ) -> Result<(bool, Weight), ProcessMessageError> { - processing_message(message)?; + meter: &mut WeightMeter, + ) -> Result { + processing_message(message, &origin)?; let weight = if message.starts_with(&b"weight="[..]) { let mut w: u64 = 0; @@ -187,22 +188,26 @@ impl ProcessMessage for RecordingMessageProcessor { } else { 1 }; - let weight = Weight::from_parts(weight, weight); + let required = Weight::from_parts(weight, weight); - if weight.all_lte(weight_limit) { + if meter.check_accrue(required) { let mut m = MessagesProcessed::get(); m.push((message.to_vec(), origin)); MessagesProcessed::set(m); - Ok((true, weight)) + Ok(true) } else { - Err(ProcessMessageError::Overweight(weight)) + Err(ProcessMessageError::Overweight(required)) } } } -/// Processed a mocked message. Messages that end with `badformat`, `corrupt` or `unsupported` will -/// fail with the respective error. -fn processing_message(msg: &[u8]) -> Result<(), ProcessMessageError> { +/// Processed a mocked message. Messages that end with `badformat`, `corrupt`, `unsupported` or +/// `yield` will fail with an error respectively. +fn processing_message(msg: &[u8], origin: &MessageOrigin) -> Result<(), ProcessMessageError> { + if SuspendedQueues::get().contains(&origin) { + return Err(ProcessMessageError::Yield) + } + let msg = String::from_utf8_lossy(msg); if msg.ends_with("badformat") { Err(ProcessMessageError::BadFormat) @@ -210,6 +215,8 @@ fn processing_message(msg: &[u8]) -> Result<(), ProcessMessageError> { Err(ProcessMessageError::Corrupt) } else if msg.ends_with("unsupported") { Err(ProcessMessageError::Unsupported) + } else if msg.ends_with("yield") { + Err(ProcessMessageError::Yield) } else { Ok(()) } @@ -230,20 +237,20 @@ impl ProcessMessage for CountingMessageProcessor { fn process_message( message: &[u8], - _origin: Self::Origin, - weight_limit: Weight, - ) -> Result<(bool, Weight), ProcessMessageError> { - if let Err(e) = processing_message(message) { + origin: Self::Origin, + meter: &mut WeightMeter, + ) -> Result { + if let Err(e) = processing_message(message, &origin) { NumMessagesErrored::set(NumMessagesErrored::get() + 1); return Err(e) } - let weight = Weight::from_parts(1, 1); + let required = Weight::from_parts(1, 1); - if weight.all_lte(weight_limit) { + if meter.check_accrue(required) { NumMessagesProcessed::set(NumMessagesProcessed::get() + 1); - Ok((true, weight)) + Ok(true) } else { - Err(ProcessMessageError::Overweight(weight)) + Err(ProcessMessageError::Overweight(required)) } } } @@ -285,7 +292,11 @@ pub fn set_weight(name: &str, w: Weight) { /// Assert that exactly these pages are present. Assumes `Here` origin. pub fn assert_pages(indices: &[u32]) { - assert_eq!(Pages::::iter().count(), indices.len()); + assert_eq!( + Pages::::iter_keys().count(), + indices.len(), + "Wrong number of pages in the queue" + ); for i in indices { assert!(Pages::::contains_key(MessageOrigin::Here, i)); } diff --git a/frame/message-queue/src/mock_helpers.rs b/frame/message-queue/src/mock_helpers.rs index 64fc70057cca4..716a60782ec7f 100644 --- a/frame/message-queue/src/mock_helpers.rs +++ b/frame/message-queue/src/mock_helpers.rs @@ -47,22 +47,28 @@ impl From for MessageOrigin { } } -/// Processes any message and consumes (1, 1) weight per message. -pub struct NoopMessageProcessor; -impl ProcessMessage for NoopMessageProcessor { - type Origin = MessageOrigin; +/// Processes any message and consumes `(REQUIRED_WEIGHT, REQUIRED_WEIGHT)` weight. +/// +/// Returns [ProcessMessageError::Overweight] error if the weight limit is not sufficient. +pub struct NoopMessageProcessor(PhantomData); +impl ProcessMessage + for NoopMessageProcessor +where + Origin: codec::FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug, +{ + type Origin = Origin; fn process_message( _message: &[u8], _origin: Self::Origin, - weight_limit: Weight, - ) -> Result<(bool, Weight), ProcessMessageError> { - let weight = Weight::from_parts(1, 1); + meter: &mut WeightMeter, + ) -> Result { + let required = Weight::from_parts(REQUIRED_WEIGHT, REQUIRED_WEIGHT); - if weight.all_lte(weight_limit) { - Ok((true, weight)) + if meter.check_accrue(required) { + Ok(true) } else { - Err(ProcessMessageError::Overweight(weight)) + Err(ProcessMessageError::Overweight(required)) } } } diff --git a/frame/message-queue/src/tests.rs b/frame/message-queue/src/tests.rs index f4cbd81ac6667..d3b0555f281f7 100644 --- a/frame/message-queue/src/tests.rs +++ b/frame/message-queue/src/tests.rs @@ -171,8 +171,9 @@ fn service_queues_failing_messages_works() { MessageQueue::enqueue_message(msg("badformat"), Here); MessageQueue::enqueue_message(msg("corrupt"), Here); MessageQueue::enqueue_message(msg("unsupported"), Here); - // Starts with three pages. - assert_pages(&[0, 1, 2]); + MessageQueue::enqueue_message(msg("yield"), Here); + // Starts with four pages. + assert_pages(&[0, 1, 2, 3]); assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_last_event::( @@ -201,8 +202,65 @@ fn service_queues_failing_messages_works() { } .into(), ); - // All pages removed. - assert_pages(&[]); + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(System::events().len(), 3); + // Last page with the `yield` stays in. + assert_pages(&[3]); + }); +} + +#[test] +fn service_queues_suspension_works() { + use MessageOrigin::*; + new_test_ext::().execute_with(|| { + MessageQueue::enqueue_messages(vec![msg("a"), msg("b"), msg("c")].into_iter(), Here); + MessageQueue::enqueue_messages(vec![msg("x"), msg("y"), msg("z")].into_iter(), There); + MessageQueue::enqueue_messages( + vec![msg("m"), msg("n"), msg("o")].into_iter(), + Everywhere(0), + ); + assert_eq!(QueueChanges::take(), vec![(Here, 3, 3), (There, 3, 3), (Everywhere(0), 3, 3)]); + + // Service one message from `Here`. + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]); + assert_eq!(QueueChanges::take(), vec![(Here, 2, 2)]); + + // Pause queue `Here` and `Everywhere(0)`. + SuspendedQueues::set(vec![Here, Everywhere(0)]); + + // Service one message from `There`. + assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]); + assert_eq!(QueueChanges::take(), vec![(There, 2, 2)]); + + // Now it would normally swap to `Everywhere(0)` and `Here`, but they are paused so we + // expect `There` again. + assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("y"), There), (vmsg("z"), There)]); + + // Processing with max-weight won't do anything. + assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); + assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); + + // ... until we resume `Here`: + SuspendedQueues::set(vec![Everywhere(0)]); + assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight()); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("b"), Here), (vmsg("c"), Here)]); + + // Everywhere still won't move. + assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero()); + SuspendedQueues::take(); + // Resume `Everywhere(0)` makes it work. + assert_eq!(MessageQueue::service_queues(Weight::MAX), 3.into_weight()); + assert_eq!( + MessagesProcessed::take(), + vec![ + (vmsg("m"), Everywhere(0)), + (vmsg("n"), Everywhere(0)), + (vmsg("o"), Everywhere(0)) + ] + ); }); } @@ -379,7 +437,7 @@ fn service_page_works() { assert_eq!(status, Bailed); } } - assert!(!Pages::::contains_key(Here, 0), "The page got removed"); + assert_pages(&[]); }); } @@ -445,6 +503,57 @@ fn service_page_item_bails() { }); } +#[test] +fn service_page_suspension_works() { + use super::integration_test::Test; // Run with larger page size. + use MessageOrigin::*; + use PageExecutionStatus::*; + + new_test_ext::().execute_with(|| { + let (page, mut msgs) = full_page::(); + assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page"); + let mut book = book_for::(&page); + Pages::::insert(Here, 0, page); + + // First we process 5 messages from this page. + let mut meter = WeightMeter::from_limit(5.into_weight()); + let (_, status) = + crate::Pallet::::service_page(&Here, &mut book, &mut meter, Weight::MAX); + + assert_eq!(NumMessagesProcessed::take(), 5); + assert!(meter.remaining().is_zero()); + assert_eq!(status, Bailed); // It bailed since weight is missing. + msgs -= 5; + + // Then we pause the queue. + SuspendedQueues::set(vec![Here]); + // Noting happens... + for _ in 0..5 { + let (_, status) = crate::Pallet::::service_page( + &Here, + &mut book, + &mut WeightMeter::max_limit(), + Weight::MAX, + ); + assert_eq!(status, NoProgress); + assert!(NumMessagesProcessed::take().is_zero()); + } + + // Resume and process all remaining. + SuspendedQueues::take(); + let (_, status) = crate::Pallet::::service_page( + &Here, + &mut book, + &mut WeightMeter::max_limit(), + Weight::MAX, + ); + assert_eq!(status, NoMore); + assert_eq!(NumMessagesProcessed::take(), msgs); + + assert!(Pages::::iter_keys().count().is_zero()); + }); +} + #[test] fn bump_service_head_works() { use MessageOrigin::*; diff --git a/frame/message-queue/src/weights.rs b/frame/message-queue/src/weights.rs index be2f801f195a9..b48cf811a17ef 100644 --- a/frame/message-queue/src/weights.rs +++ b/frame/message-queue/src/weights.rs @@ -18,25 +18,26 @@ //! Autogenerated weights for pallet_message_queue //! //! THIS FILE WAS AUTO-GENERATED USING THE SUBSTRATE BENCHMARK CLI VERSION 4.0.0-dev -//! DATE: 2023-01-24, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` +//! DATE: 2023-02-24, STEPS: `50`, REPEAT: `20`, LOW RANGE: `[]`, HIGH RANGE: `[]` //! WORST CASE MAP SIZE: `1000000` -//! HOSTNAME: `bm2`, CPU: `Intel(R) Core(TM) i7-7700K CPU @ 4.20GHz` +//! HOSTNAME: `bm3`, CPU: `Intel(R) Core(TM) i7-7700K CPU @ 4.20GHz` //! EXECUTION: Some(Wasm), WASM-EXECUTION: Compiled, CHAIN: Some("dev"), DB CACHE: 1024 // Executed Command: -// ./target/production/substrate +// target/production/substrate // benchmark // pallet -// --chain=dev // --steps=50 // --repeat=20 -// --pallet=pallet_message_queue // --extrinsic=* // --execution=wasm // --wasm-execution=compiled // --heap-pages=4096 -// --output=./frame/message-queue/src/weights.rs +// --json-file=/var/lib/gitlab-runner/builds/zyw4fam_/0/parity/mirrors/substrate/.git/.artifacts/bench.json +// --pallet=pallet_message_queue +// --chain=dev // --header=./HEADER-APACHE2 +// --output=./frame/message-queue/src/weights.rs // --template=./.maintain/frame-weight-template.hbs #![cfg_attr(rustfmt, rustfmt_skip)] @@ -64,61 +65,66 @@ pub trait WeightInfo { pub struct SubstrateWeight(PhantomData); impl WeightInfo for SubstrateWeight { /// Storage: MessageQueue ServiceHead (r:1 w:0) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) /// Storage: MessageQueue BookStateFor (r:2 w:2) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn ready_ring_knit() -> Weight { // Proof Size summary in bytes: - // Measured: `837` - // Estimated: `5554` - // Minimum execution time: 12_676 nanoseconds. - Weight::from_parts(13_113_000, 5554) + // Measured: `829` + // Estimated: `5547` + // Minimum execution time: 15_241 nanoseconds. + Weight::from_ref_time(15_603_000) + .saturating_add(Weight::from_proof_size(5547)) .saturating_add(T::DbWeight::get().reads(3_u64)) .saturating_add(T::DbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:2 w:2) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue ServiceHead (r:1 w:1) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) fn ready_ring_unknit() -> Weight { // Proof Size summary in bytes: - // Measured: `837` - // Estimated: `5554` - // Minimum execution time: 12_654 nanoseconds. - Weight::from_parts(12_969_000, 5554) + // Measured: `829` + // Estimated: `5547` + // Minimum execution time: 14_652 nanoseconds. + Weight::from_ref_time(14_983_000) + .saturating_add(Weight::from_proof_size(5547)) .saturating_add(T::DbWeight::get().reads(3_u64)) .saturating_add(T::DbWeight::get().writes(3_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn service_queue_base() -> Weight { // Proof Size summary in bytes: // Measured: `576` - // Estimated: `2527` - // Minimum execution time: 5_096 nanoseconds. - Weight::from_parts(5_280_000, 2527) + // Estimated: `2524` + // Minimum execution time: 5_750 nanoseconds. + Weight::from_ref_time(6_003_000) + .saturating_add(Weight::from_proof_size(2524)) .saturating_add(T::DbWeight::get().reads(1_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn service_page_base_completion() -> Weight { // Proof Size summary in bytes: - // Measured: `648` - // Estimated: `68060` - // Minimum execution time: 7_291 nanoseconds. - Weight::from_parts(7_564_000, 68060) + // Measured: `647` + // Estimated: `68059` + // Minimum execution time: 8_257 nanoseconds. + Weight::from_ref_time(8_506_000) + .saturating_add(Weight::from_proof_size(68059)) .saturating_add(T::DbWeight::get().reads(1_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn service_page_base_no_completion() -> Weight { // Proof Size summary in bytes: - // Measured: `648` - // Estimated: `68060` - // Minimum execution time: 7_401 nanoseconds. - Weight::from_parts(7_681_000, 68060) + // Measured: `647` + // Estimated: `68059` + // Minimum execution time: 8_422 nanoseconds. + Weight::from_ref_time(8_589_000) + .saturating_add(Weight::from_proof_size(68059)) .saturating_add(T::DbWeight::get().reads(1_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } @@ -126,58 +132,63 @@ impl WeightInfo for SubstrateWeight { // Proof Size summary in bytes: // Measured: `972` // Estimated: `0` - // Minimum execution time: 79_412 nanoseconds. - Weight::from_ref_time(79_816_000) + // Minimum execution time: 81_929 nanoseconds. + Weight::from_ref_time(82_375_000) + .saturating_add(Weight::from_proof_size(0)) } /// Storage: MessageQueue ServiceHead (r:1 w:1) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) /// Storage: MessageQueue BookStateFor (r:1 w:0) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn bump_service_head() -> Weight { // Proof Size summary in bytes: - // Measured: `712` - // Estimated: `3027` - // Minimum execution time: 8_258 nanoseconds. - Weight::from_parts(8_438_000, 3027) + // Measured: `706` + // Estimated: `3023` + // Minimum execution time: 8_992 nanoseconds. + Weight::from_ref_time(9_200_000) + .saturating_add(Weight::from_proof_size(3023)) .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn reap_page() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 61_361 nanoseconds. - Weight::from_parts(62_103_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 68_292 nanoseconds. + Weight::from_ref_time(69_108_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn execute_overweight_page_removed() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 75_153 nanoseconds. - Weight::from_parts(76_093_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 83_855 nanoseconds. + Weight::from_ref_time(84_946_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn execute_overweight_page_updated() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 88_272 nanoseconds. - Weight::from_parts(89_373_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 96_997 nanoseconds. + Weight::from_ref_time(98_668_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(2_u64)) } @@ -186,61 +197,66 @@ impl WeightInfo for SubstrateWeight { // For backwards compatibility and tests impl WeightInfo for () { /// Storage: MessageQueue ServiceHead (r:1 w:0) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) /// Storage: MessageQueue BookStateFor (r:2 w:2) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn ready_ring_knit() -> Weight { // Proof Size summary in bytes: - // Measured: `837` - // Estimated: `5554` - // Minimum execution time: 12_676 nanoseconds. - Weight::from_parts(13_113_000, 5554) + // Measured: `829` + // Estimated: `5547` + // Minimum execution time: 15_241 nanoseconds. + Weight::from_ref_time(15_603_000) + .saturating_add(Weight::from_proof_size(5547)) .saturating_add(RocksDbWeight::get().reads(3_u64)) .saturating_add(RocksDbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:2 w:2) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue ServiceHead (r:1 w:1) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) fn ready_ring_unknit() -> Weight { // Proof Size summary in bytes: - // Measured: `837` - // Estimated: `5554` - // Minimum execution time: 12_654 nanoseconds. - Weight::from_parts(12_969_000, 5554) + // Measured: `829` + // Estimated: `5547` + // Minimum execution time: 14_652 nanoseconds. + Weight::from_ref_time(14_983_000) + .saturating_add(Weight::from_proof_size(5547)) .saturating_add(RocksDbWeight::get().reads(3_u64)) .saturating_add(RocksDbWeight::get().writes(3_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn service_queue_base() -> Weight { // Proof Size summary in bytes: // Measured: `576` - // Estimated: `2527` - // Minimum execution time: 5_096 nanoseconds. - Weight::from_parts(5_280_000, 2527) + // Estimated: `2524` + // Minimum execution time: 5_750 nanoseconds. + Weight::from_ref_time(6_003_000) + .saturating_add(Weight::from_proof_size(2524)) .saturating_add(RocksDbWeight::get().reads(1_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn service_page_base_completion() -> Weight { // Proof Size summary in bytes: - // Measured: `648` - // Estimated: `68060` - // Minimum execution time: 7_291 nanoseconds. - Weight::from_parts(7_564_000, 68060) + // Measured: `647` + // Estimated: `68059` + // Minimum execution time: 8_257 nanoseconds. + Weight::from_ref_time(8_506_000) + .saturating_add(Weight::from_proof_size(68059)) .saturating_add(RocksDbWeight::get().reads(1_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn service_page_base_no_completion() -> Weight { // Proof Size summary in bytes: - // Measured: `648` - // Estimated: `68060` - // Minimum execution time: 7_401 nanoseconds. - Weight::from_parts(7_681_000, 68060) + // Measured: `647` + // Estimated: `68059` + // Minimum execution time: 8_422 nanoseconds. + Weight::from_ref_time(8_589_000) + .saturating_add(Weight::from_proof_size(68059)) .saturating_add(RocksDbWeight::get().reads(1_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } @@ -248,58 +264,63 @@ impl WeightInfo for () { // Proof Size summary in bytes: // Measured: `972` // Estimated: `0` - // Minimum execution time: 79_412 nanoseconds. - Weight::from_ref_time(79_816_000) + // Minimum execution time: 81_929 nanoseconds. + Weight::from_ref_time(82_375_000) + .saturating_add(Weight::from_proof_size(0)) } /// Storage: MessageQueue ServiceHead (r:1 w:1) - /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(5), added: 500, mode: MaxEncodedLen) + /// Proof: MessageQueue ServiceHead (max_values: Some(1), max_size: Some(4), added: 499, mode: MaxEncodedLen) /// Storage: MessageQueue BookStateFor (r:1 w:0) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) fn bump_service_head() -> Weight { // Proof Size summary in bytes: - // Measured: `712` - // Estimated: `3027` - // Minimum execution time: 8_258 nanoseconds. - Weight::from_parts(8_438_000, 3027) + // Measured: `706` + // Estimated: `3023` + // Minimum execution time: 8_992 nanoseconds. + Weight::from_ref_time(9_200_000) + .saturating_add(Weight::from_proof_size(3023)) .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn reap_page() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 61_361 nanoseconds. - Weight::from_parts(62_103_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 68_292 nanoseconds. + Weight::from_ref_time(69_108_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn execute_overweight_page_removed() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 75_153 nanoseconds. - Weight::from_parts(76_093_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 83_855 nanoseconds. + Weight::from_ref_time(84_946_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(2_u64)) } /// Storage: MessageQueue BookStateFor (r:1 w:1) - /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(52), added: 2527, mode: MaxEncodedLen) + /// Proof: MessageQueue BookStateFor (max_values: None, max_size: Some(49), added: 2524, mode: MaxEncodedLen) /// Storage: MessageQueue Pages (r:1 w:1) - /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65585), added: 68060, mode: MaxEncodedLen) + /// Proof: MessageQueue Pages (max_values: None, max_size: Some(65584), added: 68059, mode: MaxEncodedLen) fn execute_overweight_page_updated() -> Weight { // Proof Size summary in bytes: - // Measured: `66827` - // Estimated: `70587` - // Minimum execution time: 88_272 nanoseconds. - Weight::from_parts(89_373_000, 70587) + // Measured: `66825` + // Estimated: `70583` + // Minimum execution time: 96_997 nanoseconds. + Weight::from_ref_time(98_668_000) + .saturating_add(Weight::from_proof_size(70583)) .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(2_u64)) } diff --git a/frame/support/src/traits.rs b/frame/support/src/traits.rs index a649dad40078d..da8efe6afc483 100644 --- a/frame/support/src/traits.rs +++ b/frame/support/src/traits.rs @@ -114,8 +114,8 @@ pub use preimages::{Bounded, BoundedInline, FetchResult, Hash, QueryPreimage, St mod messages; pub use messages::{ - EnqueueMessage, ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, - ServiceQueues, + EnqueueMessage, ExecuteOverweightError, Footprint, NoopServiceQueues, ProcessMessage, + ProcessMessageError, ServiceQueues, TransformOrigin, }; #[cfg(feature = "try-runtime")] diff --git a/frame/support/src/traits/messages.rs b/frame/support/src/traits/messages.rs index 637aa7fdc045b..781da3ed6c704 100644 --- a/frame/support/src/traits/messages.rs +++ b/frame/support/src/traits/messages.rs @@ -22,7 +22,7 @@ use scale_info::TypeInfo; use sp_core::{ConstU32, Get, TypedGet}; use sp_runtime::{traits::Convert, BoundedSlice, RuntimeDebug}; use sp_std::{fmt::Debug, marker::PhantomData, prelude::*}; -use sp_weights::Weight; +use sp_weights::{Weight, WeightMeter}; /// Errors that can happen when attempting to process a message with /// [`ProcessMessage::process_message()`]. @@ -38,6 +38,13 @@ pub enum ProcessMessageError { /// would be respected. The parameter gives the maximum weight which the message could take /// to process. Overweight(Weight), + /// The queue wants to give up its current processing slot. + /// + /// Hints the message processor to cease servicing this queue and proceed to the next + /// one. This is seen as a *hint*, not an instruction. Implementations must therefore handle + /// the case that a queue is re-serviced within the same block after *yielding*. A queue is + /// not required to *yield* again when it is being re-serviced withing the same block. + Yield, } /// Can process messages from a specific origin. @@ -45,12 +52,14 @@ pub trait ProcessMessage { /// The transport from where a message originates. type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug; - /// Process the given message, using no more than `weight_limit` in weight to do so. + /// Process the given message, using no more than the remaining `meter` weight to do so. + /// + /// Returns whether the message was processed. fn process_message( message: &[u8], origin: Self::Origin, - weight_limit: Weight, - ) -> Result<(bool, Weight), ProcessMessageError>; + meter: &mut WeightMeter, + ) -> Result; } /// Errors that can happen when attempting to execute an overweight message with @@ -85,6 +94,16 @@ pub trait ServiceQueues { } } +/// Services queues by doing nothing. +pub struct NoopServiceQueues(PhantomData); +impl ServiceQueues for NoopServiceQueues { + type OverweightMessageAddress = OverweightAddr; + + fn service_queues(_: Weight) -> Weight { + Weight::zero() + } +} + /// The resource footprint of a queue. #[derive(Default, Copy, Clone, Eq, PartialEq, RuntimeDebug)] pub struct Footprint {