diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index b8acc07a860620..8db3f999bcda06 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8,7 +8,11 @@ use { base64::{prelude::BASE64_STANDARD, Engine}, bincode::{config::Options, serialize}, crossbeam_channel::{unbounded, Receiver, Sender}, - jsonrpc_core::{futures::future, types::error, BoxFuture, Error, Metadata, Result}, + jsonrpc_core::{ + futures::future::{self, FutureExt, OptionFuture}, + types::error, + BoxFuture, Error, Metadata, Result, + }, jsonrpc_derive::rpc, solana_account_decoder::{ encode_ui_account, @@ -18,7 +22,7 @@ use { }, solana_accounts_db::{ accounts::AccountAddressFilter, - accounts_index::{AccountIndex, AccountSecondaryIndexes, IndexKey, ScanConfig}, + accounts_index::{AccountIndex, AccountSecondaryIndexes, IndexKey, ScanConfig, ScanResult}, }, solana_client::connection_cache::Protocol, solana_entry::entry::Entry, @@ -55,7 +59,7 @@ use { bank_forks::BankForks, commitment::{BlockCommitmentArray, BlockCommitmentCache}, installed_scheduler_pool::BankWithScheduler, - non_circulating_supply::calculate_non_circulating_supply, + non_circulating_supply::{calculate_non_circulating_supply, NonCirculatingSupply}, prioritization_fee_cache::PrioritizationFeeCache, snapshot_config::SnapshotConfig, snapshot_utils, @@ -78,6 +82,7 @@ use { self, AddressLoader, MessageHash, SanitizedTransaction, TransactionError, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS, }, + transaction_context::TransactionAccount, }, solana_send_transaction_service::send_transaction_service::TransactionInfo, solana_stake_program, @@ -111,6 +116,7 @@ use { }, time::Duration, }, + tokio::runtime::Runtime, }; #[cfg(test)] use { @@ -148,7 +154,7 @@ fn is_finalized( && (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot)) } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct JsonRpcConfig { pub enable_rpc_transaction_history: bool, pub enable_extended_tx_metadata_storage: bool, @@ -159,6 +165,7 @@ pub struct JsonRpcConfig { pub max_multiple_accounts: Option, pub account_indexes: AccountSecondaryIndexes, pub rpc_threads: usize, + pub rpc_blocking_threads: usize, pub rpc_niceness_adj: i8, pub full_api: bool, pub rpc_scan_and_fix_roots: bool, @@ -167,6 +174,28 @@ pub struct JsonRpcConfig { pub disable_health_check: bool, } +impl Default for JsonRpcConfig { + fn default() -> Self { + Self { + enable_rpc_transaction_history: Default::default(), + enable_extended_tx_metadata_storage: Default::default(), + faucet_addr: Option::default(), + health_check_slot_distance: Default::default(), + skip_preflight_health_check: bool::default(), + rpc_bigtable_config: Option::default(), + max_multiple_accounts: Option::default(), + account_indexes: AccountSecondaryIndexes::default(), + rpc_threads: 1, + rpc_blocking_threads: 1, + rpc_niceness_adj: Default::default(), + full_api: Default::default(), + rpc_scan_and_fix_roots: Default::default(), + max_request_body_size: Option::default(), + disable_health_check: Default::default(), + } + } +} + impl JsonRpcConfig { pub fn default_for_test() -> Self { Self { @@ -221,6 +250,7 @@ pub struct JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -251,6 +281,51 @@ impl JsonRpcRequestProcessor { Ok(bank) } + async fn calculate_non_circulating_supply( + &self, + bank: &Arc, + ) -> ScanResult { + let bank = Arc::clone(bank); + self.runtime + .spawn_blocking(move || calculate_non_circulating_supply(&bank)) + .await + .expect("Failed to spawn blocking task") + } + + pub async fn get_filtered_indexed_accounts( + &self, + bank: &Arc, + index_key: &IndexKey, + program_id: &Pubkey, + filters: Vec, + sort_results: bool, + ) -> ScanResult> { + let bank = Arc::clone(bank); + let index_key = index_key.to_owned(); + let program_id = program_id.to_owned(); + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &index_key, + |account| { + // The program-id account index checks for Account owner on inclusion. + // However, due to the current AccountsDb implementation, an account may + // remain in storage as a zero-lamport AccountSharedData::Default() after + // being wiped and reinitialized in later updates. We include the redundant + // filters here to avoid returning these accounts. + account.owner().eq(&program_id) + && filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + bank.byte_limit_for_scans(), + ) + }) + .await + .expect("Failed to spawn blocking task") + } + #[allow(deprecated)] fn bank(&self, commitment: Option) -> Arc { debug!("RPC commitment_config: {:?}", commitment); @@ -327,6 +402,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, ) -> (Self, Receiver) { let (sender, receiver) = unbounded(); ( @@ -349,6 +425,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + runtime, }, receiver, ) @@ -360,6 +437,8 @@ impl JsonRpcRequestProcessor { socket_addr_space: SocketAddrSpace, connection_cache: Arc, ) -> Self { + use crate::rpc_service::service_runtime; + let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); let bank = bank_forks.read().unwrap().root_bank(); @@ -394,8 +473,15 @@ impl JsonRpcRequestProcessor { let slot = bank.slot(); let optimistically_confirmed_bank = Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank })); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; Self { - config: JsonRpcConfig::default(), + config, snapshot_config: None, bank_forks, block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( @@ -423,12 +509,13 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), max_complete_rewards_slot: Arc::new(AtomicU64::default()), prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()), + runtime: service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), } } - pub fn get_account_info( + pub async fn get_account_info( &self, - pubkey: &Pubkey, + pubkey: Pubkey, config: Option, ) -> Result>> { let RpcAccountInfoConfig { @@ -443,11 +530,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - let response = get_encoded_account(&bank, pubkey, encoding, data_slice, None)?; + let response = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?; Ok(new_response(&bank, response)) } - pub fn get_multiple_accounts( + pub async fn get_multiple_accounts( &self, pubkeys: Vec, config: Option, @@ -464,10 +558,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Base64); - let accounts = pubkeys - .into_iter() - .map(|pubkey| get_encoded_account(&bank, &pubkey, encoding, data_slice, None)) - .collect::>>()?; + let mut accounts = Vec::with_capacity(pubkeys.len()); + for pubkey in pubkeys { + let bank = Arc::clone(&bank); + accounts.push( + self.runtime + .spawn_blocking(move || { + get_encoded_account(&bank, &pubkey, encoding, data_slice, None) + }) + .await + .expect("rpc: get_encoded_account panicked")?, + ); + } Ok(new_response(&bank, accounts)) } @@ -480,9 +582,9 @@ impl JsonRpcRequestProcessor { .get_minimum_balance_for_rent_exemption(data_len) } - pub fn get_program_accounts( + pub async fn get_program_accounts( &self, - program_id: &Pubkey, + program_id: Pubkey, config: Option, mut filters: Vec, with_context: bool, @@ -501,30 +603,38 @@ impl JsonRpcRequestProcessor { let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); optimize_filters(&mut filters); let keyed_accounts = { - if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) { + if let Some(owner) = get_spl_token_owner_filter(&program_id, &filters) { self.get_filtered_spl_token_accounts_by_owner( - &bank, + Arc::clone(&bank), program_id, - &owner, + owner, filters, sort_results, - )? - } else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) { + ) + .await? + } else if let Some(mint) = get_spl_token_mint_filter(&program_id, &filters) { self.get_filtered_spl_token_accounts_by_mint( - &bank, + Arc::clone(&bank), program_id, - &mint, + mint, filters, sort_results, - )? + ) + .await? } else { - self.get_filtered_program_accounts(&bank, program_id, filters, sort_results)? + self.get_filtered_program_accounts( + Arc::clone(&bank), + program_id, + filters, + sort_results, + ) + .await? } }; - let accounts = if is_known_spl_token_id(program_id) + let accounts = if is_known_spl_token_id(&program_id) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() + get_parsed_token_accounts(Arc::clone(&bank), keyed_accounts.into_iter()).collect() } else { keyed_accounts .into_iter() @@ -918,7 +1028,7 @@ impl JsonRpcRequestProcessor { largest_accounts_cache.set_largest_accounts(filter, slot, accounts) } - fn get_largest_accounts( + async fn get_largest_accounts( &self, config: Option, ) -> RpcCustomResult>> { @@ -933,11 +1043,11 @@ impl JsonRpcRequestProcessor { }) } else { let (addresses, address_filter) = if let Some(filter) = config.clone().filter { - let non_circulating_supply = - calculate_non_circulating_supply(&bank).map_err(|e| { - RpcCustomError::ScanError { - message: e.to_string(), - } + let non_circulating_supply = self + .calculate_non_circulating_supply(&bank) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), })?; let addresses = non_circulating_supply.accounts.into_iter().collect(); let address_filter = match filter { @@ -948,13 +1058,21 @@ impl JsonRpcRequestProcessor { } else { (HashSet::new(), AccountAddressFilter::Exclude) }; - let accounts = bank - .get_largest_accounts( - NUM_LARGEST_ACCOUNTS, - &addresses, - address_filter, - sort_results, - ) + let accounts = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || { + bank.get_largest_accounts( + NUM_LARGEST_ACCOUNTS, + &addresses, + address_filter, + sort_results, + ) + } + }) + .await + .expect("Failed to spawn blocking task") .map_err(|e| RpcCustomError::ScanError { message: e.to_string(), })? @@ -970,16 +1088,18 @@ impl JsonRpcRequestProcessor { } } - fn get_supply( + async fn get_supply( &self, config: Option, ) -> RpcCustomResult> { let config = config.unwrap_or_default(); let bank = self.bank(config.commitment); let non_circulating_supply = - calculate_non_circulating_supply(&bank).map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?; + self.calculate_non_circulating_supply(&bank) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + })?; let total_supply = bank.capitalization(); let non_circulating_accounts = if config.exclude_non_circulating_accounts_list { vec![] @@ -1184,42 +1304,65 @@ impl JsonRpcRequestProcessor { .highest_super_majority_root() { self.check_blockstore_writes_complete(slot)?; - let result = self.blockstore.get_rooted_block(slot, true); + let result = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + move || blockstore.get_rooted_block(slot, true) + }) + .await + .expect("Failed to spawn blocking task"); self.check_blockstore_root(&result, slot)?; - let encode_block = |confirmed_block: ConfirmedBlock| -> Result { - let mut encoded_block = confirmed_block - .encode_with_options(encoding, encoding_options) - .map_err(RpcCustomError::from)?; + let encode_block = |confirmed_block: ConfirmedBlock| async move { + let mut encoded_block = self + .runtime + .spawn_blocking(move || { + confirmed_block + .encode_with_options(encoding, encoding_options) + .map_err(RpcCustomError::from) + }) + .await + .expect("Failed to spawn blocking task")?; if slot == 0 { encoded_block.block_time = Some(self.genesis_creation_time()); encoded_block.block_height = Some(0); } - Ok(encoded_block) + Ok::(encoded_block) }; if result.is_err() { if let Some(bigtable_ledger_storage) = &self.bigtable_ledger_storage { let bigtable_result = bigtable_ledger_storage.get_confirmed_block(slot).await; self.check_bigtable_result(&bigtable_result)?; - return bigtable_result.ok().map(encode_block).transpose(); + let encoded_block_future: OptionFuture<_> = + bigtable_result.ok().map(encode_block).into(); + return encoded_block_future.await.transpose(); } } self.check_slot_cleaned_up(&result, slot)?; - return result + let encoded_block_future: OptionFuture<_> = result .ok() .map(ConfirmedBlock::from) .map(encode_block) - .transpose(); + .into(); + return encoded_block_future.await.transpose(); } else if commitment.is_confirmed() { // Check if block is confirmed let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); if confirmed_bank.status_cache_ancestors().contains(&slot) { self.check_blockstore_writes_complete(slot)?; - let result = self.blockstore.get_complete_block(slot, true); - return result + let result = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + move || blockstore.get_complete_block(slot, true) + }) + .await + .expect("Failed to spawn blocking task"); + let encoded_block_future: OptionFuture<_> = result .ok() .map(ConfirmedBlock::from) - .map(|mut confirmed_block| -> Result { + .map(|mut confirmed_block| async move { if confirmed_block.block_time.is_none() || confirmed_block.block_height.is_none() { @@ -1234,12 +1377,20 @@ impl JsonRpcRequestProcessor { } } } - - Ok(confirmed_block - .encode_with_options(encoding, encoding_options) - .map_err(RpcCustomError::from)?) + let encoded_block = self + .runtime + .spawn_blocking(move || { + confirmed_block + .encode_with_options(encoding, encoding_options) + .map_err(RpcCustomError::from) + }) + .await + .expect("Failed to spawn blocking task")?; + + Ok(encoded_block) }) - .transpose(); + .into(); + return encoded_block_future.await.transpose(); } } } else { @@ -1597,13 +1748,22 @@ impl JsonRpcRequestProcessor { if self.config.enable_rpc_transaction_history { let confirmed_bank = self.bank(Some(CommitmentConfig::confirmed())); - let confirmed_transaction = if commitment.is_confirmed() { - let highest_confirmed_slot = confirmed_bank.slot(); - self.blockstore - .get_complete_transaction(signature, highest_confirmed_slot) - } else { - self.blockstore.get_rooted_transaction(signature) - }; + let confirmed_transaction = self + .runtime + .spawn_blocking({ + let blockstore = Arc::clone(&self.blockstore); + let confirmed_bank = Arc::clone(&confirmed_bank); + move || { + if commitment.is_confirmed() { + let highest_confirmed_slot = confirmed_bank.slot(); + blockstore.get_complete_transaction(signature, highest_confirmed_slot) + } else { + blockstore.get_rooted_transaction(signature) + } + } + }) + .await + .expect("Failed to spawn blocking task"); let encode_transaction = |confirmed_tx_with_meta: ConfirmedTransactionWithStatusMeta| -> Result { @@ -1879,13 +2039,13 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, supply)) } - pub fn get_token_largest_accounts( + pub async fn get_token_largest_accounts( &self, - mint: &Pubkey, + mint: Pubkey, commitment: Option, ) -> Result>> { let bank = self.bank(commitment); - let (mint_owner, data) = get_mint_owner_and_additional_data(&bank, mint)?; + let (mint_owner, data) = get_mint_owner_and_additional_data(&bank, &mint)?; if !is_known_spl_token_id(&mint_owner) { return Err(Error::invalid_params( "Invalid param: not a Token mint".to_string(), @@ -1894,8 +2054,15 @@ impl JsonRpcRequestProcessor { let mut token_balances = BinaryHeap::>::with_capacity(NUM_LARGEST_ACCOUNTS); - for (address, account) in - self.get_filtered_spl_token_accounts_by_mint(&bank, &mint_owner, mint, vec![], true)? + for (address, account) in self + .get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + mint_owner, + mint, + vec![], + true, + ) + .await? { let amount = StateWithExtensions::::unpack(account.data()) .map(|account| account.base.amount) @@ -1928,9 +2095,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, token_balances)) } - pub fn get_token_accounts_by_owner( + pub async fn get_token_accounts_by_owner( &self, - owner: &Pubkey, + owner: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, sort_results: bool, @@ -1957,13 +2124,15 @@ impl JsonRpcRequestProcessor { ))); } - let keyed_accounts = self.get_filtered_spl_token_accounts_by_owner( - &bank, - &token_program_id, - owner, - filters, - sort_results, - )?; + let keyed_accounts = self + .get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + token_program_id, + owner, + filters, + sort_results, + ) + .await?; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() } else { @@ -1980,9 +2149,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, accounts)) } - pub fn get_token_accounts_by_delegate( + pub async fn get_token_accounts_by_delegate( &self, - delegate: &Pubkey, + delegate: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, sort_results: bool, @@ -2012,16 +2181,23 @@ impl JsonRpcRequestProcessor { // Optional filter on Mint address, uses mint account index for scan let keyed_accounts = if let Some(mint) = mint { self.get_filtered_spl_token_accounts_by_mint( - &bank, - &token_program_id, - &mint, + Arc::clone(&bank), + token_program_id, + mint, filters, sort_results, - )? + ) + .await? } else { // Filter on Token Account state filters.push(RpcFilterType::TokenAccountState); - self.get_filtered_program_accounts(&bank, &token_program_id, filters, sort_results)? + self.get_filtered_program_accounts( + Arc::clone(&bank), + token_program_id, + filters, + sort_results, + ) + .await? }; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() @@ -2040,66 +2216,63 @@ impl JsonRpcRequestProcessor { } /// Use a set of filters to get an iterator of keyed program accounts from a bank - fn get_filtered_program_accounts( + async fn get_filtered_program_accounts( &self, - bank: &Bank, - program_id: &Pubkey, + bank: Arc, + program_id: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { optimize_filters(&mut filters); - let filter_closure = |account: &AccountSharedData| { - filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }; if self .config .account_indexes .contains(&AccountIndex::ProgramId) { - if !self.config.account_indexes.include_key(program_id) { + if !self.config.account_indexes.include_key(&program_id) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: program_id.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::ProgramId(*program_id), - |account| { - // The program-id account index checks for Account owner on inclusion. However, due - // to the current AccountsDb implementation, an account may remain in storage as a - // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later - // updates. We include the redundant filters here to avoid returning these - // accounts. - account.owner() == program_id && filter_closure(account) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::ProgramId(program_id), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { // this path does not need to provide a mb limit because we only want to support secondary indexes - Ok(bank - .get_filtered_program_accounts( - program_id, - filter_closure, - &ScanConfig::new(!sort_results), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_program_accounts( + &program_id, + |account: &AccountSharedData| { + filters + .iter() + .all(|filter_type| filter_allows(filter_type, account)) + }, + &ScanConfig::new(!sort_results), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } } /// Get an iterator of spl-token accounts by owner address - fn get_filtered_spl_token_accounts_by_owner( + async fn get_filtered_spl_token_accounts_by_owner( &self, - bank: &Bank, - program_id: &Pubkey, - owner_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + owner_key: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { @@ -2121,37 +2294,34 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenOwner) { - if !self.config.account_indexes.include_key(owner_key) { + if !self.config.account_indexes.include_key(&owner_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: owner_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(*owner_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::SplTokenOwner(owner_key), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) + .await } } /// Get an iterator of spl-token accounts by mint address - fn get_filtered_spl_token_accounts_by_mint( + async fn get_filtered_spl_token_accounts_by_mint( &self, - bank: &Bank, - program_id: &Pubkey, - mint_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + mint_key: Pubkey, mut filters: Vec, sort_results: bool, ) -> RpcCustomResult> { @@ -2172,28 +2342,25 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenMint) { - if !self.config.account_indexes.include_key(mint_key) { + if !self.config.account_indexes.include_key(&mint_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: mint_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenMint(*mint_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_allows(filter_type, account)) - }, - &ScanConfig::new(!sort_results), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.get_filtered_indexed_accounts( + &bank, + &IndexKey::SplTokenMint(mint_key), + &program_id, + filters, + sort_results, + ) + .await + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) } else { self.get_filtered_program_accounts(bank, program_id, filters, sort_results) + .await } } @@ -3015,7 +3182,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getMultipleAccounts")] fn get_multiple_accounts( @@ -3023,7 +3190,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>>; + ) -> BoxFuture>>>>; #[rpc(meta, name = "getBlockCommitment")] fn get_block_commitment( @@ -3062,10 +3229,13 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_account_info rpc request received: {:?}", pubkey_str); - let pubkey = verify_pubkey(&pubkey_str)?; - meta.get_account_info(&pubkey, config) + async move { + let pubkey = verify_pubkey(&pubkey_str)?; + meta.get_account_info(pubkey, config).await + } + .boxed() } fn get_multiple_accounts( @@ -3073,26 +3243,28 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>> { + ) -> BoxFuture>>>> { debug!( "get_multiple_accounts rpc request received: {:?}", pubkey_strs.len() ); - - let max_multiple_accounts = meta - .config - .max_multiple_accounts - .unwrap_or(MAX_MULTIPLE_ACCOUNTS); - if pubkey_strs.len() > max_multiple_accounts { - return Err(Error::invalid_params(format!( - "Too many inputs provided; max {max_multiple_accounts}" - ))); + async move { + let max_multiple_accounts = meta + .config + .max_multiple_accounts + .unwrap_or(MAX_MULTIPLE_ACCOUNTS); + if pubkey_strs.len() > max_multiple_accounts { + return Err(Error::invalid_params(format!( + "Too many inputs provided; max {max_multiple_accounts}" + ))); + } + let pubkeys = pubkey_strs + .into_iter() + .map(|pubkey_str| verify_pubkey(&pubkey_str)) + .collect::>>()?; + meta.get_multiple_accounts(pubkeys, config).await } - let pubkeys = pubkey_strs - .into_iter() - .map(|pubkey_str| verify_pubkey(&pubkey_str)) - .collect::>>()?; - meta.get_multiple_accounts(pubkeys, config) + .boxed() } fn get_block_commitment( @@ -3146,21 +3318,21 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getLargestAccounts")] fn get_largest_accounts( &self, meta: Self::Metadata, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getSupply")] fn get_supply( &self, meta: Self::Metadata, config: Option, - ) -> Result>; + ) -> BoxFuture>>; // SPL Token-specific RPC endpoints // See https://github.com/solana-labs/solana-program-library/releases/tag/token-v2.0.0 for @@ -3172,7 +3344,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByOwner")] fn get_token_accounts_by_owner( @@ -3181,7 +3353,7 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByDelegate")] fn get_token_accounts_by_delegate( @@ -3190,7 +3362,7 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; } pub struct AccountsScanImpl; @@ -3202,49 +3374,53 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_program_accounts rpc request received: {:?}", program_id_str ); - let program_id = verify_pubkey(&program_id_str)?; - let (config, filters, with_context, sort_results) = if let Some(config) = config { - ( - Some(config.account_config), - config.filters.unwrap_or_default(), - config.with_context.unwrap_or_default(), - config.sort_results.unwrap_or(true), - ) - } else { - (None, vec![], false, true) - }; - if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { - return Err(Error::invalid_params(format!( - "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" - ))); - } - for filter in &filters { - verify_filter(filter)?; + async move { + let program_id = verify_pubkey(&program_id_str)?; + let (config, filters, with_context, sort_results) = if let Some(config) = config { + ( + Some(config.account_config), + config.filters.unwrap_or_default(), + config.with_context.unwrap_or_default(), + config.sort_results.unwrap_or(true), + ) + } else { + (None, vec![], false, true) + }; + if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { + return Err(Error::invalid_params(format!( + "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" + ))); + } + for filter in &filters { + verify_filter(filter)?; + } + meta.get_program_accounts(program_id, config, filters, with_context, sort_results) + .await } - meta.get_program_accounts(&program_id, config, filters, with_context, sort_results) + .boxed() } fn get_largest_accounts( &self, meta: Self::Metadata, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_largest_accounts rpc request received"); - Ok(meta.get_largest_accounts(config)?) + async move { Ok(meta.get_largest_accounts(config).await?) }.boxed() } fn get_supply( &self, meta: Self::Metadata, config: Option, - ) -> Result> { + ) -> BoxFuture>> { debug!("get_supply rpc request received"); - Ok(meta.get_supply(config)?) + async move { Ok(meta.get_supply(config).await?) }.boxed() } fn get_token_largest_accounts( @@ -3252,13 +3428,16 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_largest_accounts rpc request received: {:?}", mint_str ); - let mint = verify_pubkey(&mint_str)?; - meta.get_token_largest_accounts(&mint, commitment) + async move { + let mint = verify_pubkey(&mint_str)?; + meta.get_token_largest_accounts(mint, commitment).await + } + .boxed() } fn get_token_accounts_by_owner( @@ -3267,14 +3446,18 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_owner rpc request received: {:?}", owner_str ); - let owner = verify_pubkey(&owner_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_owner(&owner, token_account_filter, config, true) + async move { + let owner = verify_pubkey(&owner_str)?; + let token_account_filter = verify_token_account_filter(token_account_filter)?; + meta.get_token_accounts_by_owner(owner, token_account_filter, config, true) + .await + } + .boxed() } fn get_token_accounts_by_delegate( @@ -3283,14 +3466,18 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_delegate rpc request received: {:?}", delegate_str ); - let delegate = verify_pubkey(&delegate_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_delegate(&delegate, token_account_filter, config, true) + async move { + let delegate = verify_pubkey(&delegate_str)?; + let token_account_filter = verify_token_account_filter(token_account_filter)?; + meta.get_token_accounts_by_delegate(delegate, token_account_filter, config, true) + .await + } + .boxed() } } } @@ -4330,6 +4517,7 @@ pub mod tests { optimistically_confirmed_bank_tracker::{ BankNotification, OptimisticallyConfirmedBankTracker, }, + rpc_service::service_runtime, rpc_subscriptions::RpcSubscriptions, }, bincode::deserialize, @@ -4506,6 +4694,12 @@ pub mod tests { let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let meta = JsonRpcRequestProcessor::new( config, None, @@ -4524,6 +4718,7 @@ pub mod tests { max_complete_transaction_status_slot.clone(), max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ) .0; @@ -6472,8 +6667,15 @@ pub mod tests { .my_contact_info() .tpu(connection_cache.protocol()) .unwrap(); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6490,6 +6692,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); SendTransactionService::new::( tpu_address, @@ -6746,8 +6949,15 @@ pub mod tests { .unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (request_processor, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -6764,6 +6974,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); SendTransactionService::new::( tpu_address, @@ -8396,8 +8607,15 @@ pub mod tests { optimistically_confirmed_bank.clone(), )); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, _receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -8414,6 +8632,7 @@ pub mod tests { max_complete_transaction_status_slot, max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let mut io = MetaIoHandler::default(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index fed748d709472b..28bd96684ee0f4 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -359,6 +359,7 @@ impl JsonRpcService { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); let rpc_threads = 1.max(config.rpc_threads); + let rpc_blocking_threads = 1.max(config.rpc_blocking_threads); let rpc_niceness_adj = config.rpc_niceness_adj; let health = Arc::new(RpcHealth::new( @@ -378,21 +379,7 @@ impl JsonRpcService { .tpu(connection_cache.protocol()) .map_err(|err| format!("{err}"))?; - // sadly, some parts of our current rpc implemention block the jsonrpc's - // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, - // causing no further processing of incoming requests and ultimatily innocent clients timing-out. - // So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1, - // so that we avoid the single-threaded event loops from being created automatically by - // jsonrpc for threads when .threads(N > 1) is given. - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(rpc_threads) - .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) - .thread_name("solRpcEl") - .enable_all() - .build() - .expect("Runtime"), - ); + let runtime = service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj); let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); @@ -470,6 +457,7 @@ impl JsonRpcService { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + Arc::clone(&runtime), ); let leader_info = @@ -578,6 +566,40 @@ impl JsonRpcService { } } +pub fn service_runtime( + rpc_threads: usize, + rpc_blocking_threads: usize, + rpc_niceness_adj: i8, +) -> Arc { + // The jsonrpc_http_server crate supports two execution models: + // + // - By default, it spawns a number of threads - configured with .threads(N) - and runs a + // single-threaded futures executor in each thread. + // - Alternatively when configured with .event_loop_executor(executor) and .threads(1), + // it executes all the tasks on the given executor, not spawning any extra internal threads. + // + // We use the latter configuration, using a multi threaded tokio runtime as the executor. We + // do this so we can configure the number of worker threads, the number of blocking threads + // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU + // bound operations like getMultipleAccounts. This results in reduced latency, since fast + // rpc calls (the majority) are not blocked by slow CPU bound ones. + // + // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many + // (busy) blocking threads could compete with CPU time with other validator threads and + // negatively impact performance. + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(rpc_threads) + .max_blocking_threads(rpc_blocking_threads) + .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) + .thread_name("solRpcEl") + .enable_all() + .build() + .expect("Runtime"), + ); + runtime +} + #[cfg(test)] mod tests { use { diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 2b655059c89e1f..f9e39840323b9a 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -965,6 +965,27 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .default_value(&default_args.rpc_threads) .help("Number of threads to use for servicing RPC requests"), ) + .arg( + Arg::with_name("rpc_blocking_threads") + .long("rpc-blocking-threads") + .value_name("NUMBER") + .validator(is_parsable::) + .validator(|value| { + value + .parse::() + .map_err(|err| format!("error parsing '{value}': {err}")) + .and_then(|threads| { + if threads > 0 { + Ok(()) + } else { + Err("value must be >= 1".to_string()) + } + }) + }) + .takes_value(true) + .default_value(&default_args.rpc_blocking_threads) + .help("Number of blocking threads to use for servicing CPU bound RPC requests (eg getMultipleAccounts)"), + ) .arg( Arg::with_name("rpc_niceness_adj") .long("rpc-niceness-adjustment") @@ -2232,6 +2253,7 @@ pub struct DefaultArgs { pub rpc_send_transaction_batch_size: String, pub rpc_send_transaction_retry_pool_max_size: String, pub rpc_threads: String, + pub rpc_blocking_threads: String, pub rpc_niceness_adjustment: String, pub rpc_bigtable_timeout: String, pub rpc_bigtable_instance_name: String, @@ -2323,6 +2345,7 @@ impl DefaultArgs { .retry_pool_max_size .to_string(), rpc_threads: num_cpus::get().to_string(), + rpc_blocking_threads: 1.max(num_cpus::get() / 4).to_string(), rpc_niceness_adjustment: "0".to_string(), rpc_bigtable_timeout: "30".to_string(), rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index d3686bf3d8438d..02650e25f726ef 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1441,6 +1441,7 @@ pub fn main() { ), disable_health_check: false, rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), + rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize), rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8), account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),