Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the feature flag for storing account history instead of current account state #8

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ serde_json = "1.0.74"
smpl_jwt = "0.6.1"
solana-account-decoder = { version = "=1.10.31" }
solana-bigtable-connection = { version = "=1.10.31" }
solana-bigtable-geyser-models = { version = "=1.10.31" }
solana-bigtable-geyser-models = { version = "=1.10.31", git = "https://github.com/lijunwangs/solana-bigtable-geyser-models" }
solana-geyser-plugin-interface = { version = "=1.10.31" }
solana-logger = { version = "=1.10.31" }
solana-measure = { version = "=1.10.31" }
Expand Down
6 changes: 3 additions & 3 deletions scripts/init-bigtable.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ cbt=(
-instance
"$instance"
)
if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
if [[ -n "$BIGTABLE_EMULATOR_HOST" ]]; then
cbt+=(-project emulator)
fi

for table in account slot block transaction; do
for table in account account_history slot block transaction; do
(
set -x
"${cbt[@]}" createtable $table
"${cbt[@]}" createfamily $table x
"${cbt[@]}" setgcpolicy $table x maxversions=1
"${cbt[@]}" setgcpolicy $table x maxage=360d
)
done
"${cbt[@]}" setgcpolicy account_history x "maxversions=1 && maxage=365d"
132 changes: 96 additions & 36 deletions src/parallel_bigtable_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod account;
pub mod account_index;
pub mod accounts_history;
pub mod block_metadata;
pub mod slot;
pub mod transaction;
Expand All @@ -10,6 +11,7 @@ use {
parallel_bigtable_client::{
account::{DbAccountInfo, ReadableAccountInfo, UpdateAccountRequest},
account_index::TokenSecondaryIndexEntry,
accounts_history::AccountsHistoryBatcher,
block_metadata::{DbBlockInfo, UpdateBlockMetadataRequest},
transaction::{build_db_transaction, LogTransactionRequest},
},
Expand Down Expand Up @@ -66,6 +68,7 @@ pub const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
#[warn(clippy::large_enum_variant)]
enum DbWorkItem {
UpdateAccount(Box<UpdateAccountRequest>),
UpdateAccountsBatch(Box<Vec<DbAccountInfo>>),
UpdateSlot(Box<UpdateSlotRequest>),
LogTransaction(Box<LogTransactionRequest>),
UpdateBlockMetadata(Box<UpdateBlockMetadataRequest>),
Expand All @@ -77,7 +80,6 @@ struct BigtableClientWrapper {
#[allow(dead_code)]
pub struct BufferedBigtableClient {
client: Mutex<BigtableClientWrapper>,
store_account_historical_data: bool,
batch_size: usize,
pending_account_updates: Vec<DbAccountInfo>,
index_token_owner: bool,
Expand Down Expand Up @@ -123,10 +125,6 @@ impl BufferedBigtableClient {
info!("Creating SimpleBigtableClient...");
let client = Self::connect_to_db(config).await?;

let store_account_historical_data = config
.store_account_historical_data
.unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);

let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
Expand All @@ -138,7 +136,6 @@ impl BufferedBigtableClient {
pending_account_updates: Vec::with_capacity(batch_size),
index_token_owner: config.index_token_owner.unwrap_or_default(),
index_token_mint: config.index_token_mint.unwrap_or(false),
store_account_historical_data,
pending_token_owner_index: Vec::with_capacity(batch_size),
pending_token_mint_index: Vec::with_capacity(batch_size),
})
Expand Down Expand Up @@ -180,6 +177,14 @@ impl BigtableClientWorker {
.block_on(self.client.update_account(account, is_startup))
}

fn update_accounts_batch(
&mut self,
accounts: Vec<DbAccountInfo>,
) -> Result<(usize, usize), GeyserPluginError> {
self.runtime
.block_on(self.client.update_accounts_batch(accounts))
}

fn update_slot_status(
&mut self,
request: UpdateSlotRequest,
Expand Down Expand Up @@ -242,6 +247,17 @@ impl BigtableClientWorker {
Ok(sizes) => Self::update_size_stats(sizes),
}
}
DbWorkItem::UpdateAccountsBatch(accounts) => {
match self.update_accounts_batch(*accounts) {
Err(err) => {
error!("Failed to send accounts batch: ({})", err);
if panic_on_db_errors {
abort();
}
}
Ok(sizes) => Self::update_size_stats(sizes),
}
}
DbWorkItem::UpdateSlot(request) => match self.update_slot_status(*request) {
Err(err) => {
error!("Failed to update slot: ({})", err);
Expand Down Expand Up @@ -309,6 +325,7 @@ pub struct ParallelBigtableClient {
is_startup_done: Arc<AtomicBool>,
startup_done_count: Arc<AtomicUsize>,
initialized_worker_count: Arc<AtomicUsize>,
accounts_batcher: Option<AccountsHistoryBatcher>,
sender: Sender<DbWorkItem>,
last_report: AtomicInterval,
do_work_on_startup: bool,
Expand Down Expand Up @@ -377,6 +394,15 @@ impl ParallelBigtableClient {
workers.push(worker);
}

let store_account_historical_data = config
.store_account_historical_data
.unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
let history_batcher = if store_account_historical_data {
Some(AccountsHistoryBatcher::default())
} else {
None
};

info!("Created ParallelBigtableClient.");
Ok(Self {
last_report: AtomicInterval::default(),
Expand All @@ -385,6 +411,7 @@ impl ParallelBigtableClient {
is_startup_done,
startup_done_count,
initialized_worker_count,
accounts_batcher: history_batcher,
sender,
do_work_on_startup: config.write_during_startup.unwrap_or(true),
})
Expand Down Expand Up @@ -423,39 +450,45 @@ impl ParallelBigtableClient {
);
}
let mut measure = Measure::start("geyser-plugin-bigtable-create-work-item");
let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest {
account: DbAccountInfo::new(account, slot),
is_startup,
}));

measure.stop();

inc_new_counter_debug!(
"geyser-plugin-bigtable-create-work-item-us",
measure.as_us() as usize,
100000,
100000
);
let db_account = DbAccountInfo::new(account, slot);
if let Some(batcher) = self.accounts_batcher.as_mut() {
batcher.add(db_account);
} else {
let account_request = UpdateAccountRequest {
account: db_account,
is_startup,
};
let wrk_item = DbWorkItem::UpdateAccount(Box::new(account_request));

let mut measure = Measure::start("geyser-plugin-bigtable-send-msg");
measure.stop();

if let Err(err) = self.sender.send(wrk_item) {
return Err(GeyserPluginError::AccountsUpdateError {
msg: format!(
"Failed to update the account {:?}, error: {:?}",
bs58::encode(account.pubkey()).into_string(),
err
),
});
}
inc_new_counter_debug!(
"geyser-plugin-bigtable-create-work-item-us",
measure.as_us() as usize,
100000,
100000
);

measure.stop();
inc_new_counter_debug!(
"geyser-plugin-bigtable-send-msg-us",
measure.as_us() as usize,
100000,
100000
);
let mut measure = Measure::start("geyser-plugin-bigtable-send-msg");

if let Err(err) = self.sender.send(wrk_item) {
return Err(GeyserPluginError::AccountsUpdateError {
msg: format!(
"Failed to update the account {:?}, error: {:?}",
bs58::encode(account.pubkey()).into_string(),
err
),
});
}

measure.stop();
inc_new_counter_debug!(
"geyser-plugin-bigtable-send-msg-us",
measure.as_us() as usize,
100000,
100000
);
}

Ok(())
}
Expand All @@ -469,6 +502,33 @@ impl ParallelBigtableClient {
if self.should_skip_work() {
return Ok(());
}

if let Some(batcher) = self.accounts_batcher.as_mut() {
parent.map(|parent| batcher.update_slot_parent(slot, parent));

if status == SlotStatus::Rooted {
let mut measure = Measure::start("geyser-plugin-bigtable-send-msg");

batcher.flush(slot, |batch| {
self.sender
.send(DbWorkItem::UpdateAccountsBatch(Box::new(batch)))
.map_err(|err| GeyserPluginError::AccountsUpdateError {
msg: format!(
"Failed to send the accounts batch for {:?}, error: {:?}",
slot, err
),
})
})?;
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-bigtable-send-msg-us",
measure.as_us() as usize,
1000,
1000
);
}
}

if let Err(err) = self
.sender
.send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest {
Expand Down
Loading