diff --git a/crates/sui-light-client/src/main.rs b/crates/sui-light-client/src/main.rs index 88266c13da86b..283faff9816ff 100644 --- a/crates/sui-light-client/src/main.rs +++ b/crates/sui-light-client/src/main.rs @@ -3,29 +3,34 @@ use anyhow::anyhow; use async_trait::async_trait; -use move_core_types::account_address::AccountAddress; -use sui_json_rpc_types::{SuiObjectDataOptions, SuiTransactionBlockResponseOptions}; +use move_core_types::{account_address::AccountAddress, language_storage::StructTag}; +use sui_json_rpc_types::{SuiEvent, SuiObjectDataOptions, SuiTransactionBlockResponseOptions}; use sui_rpc_api::CheckpointData; use sui_types::{ base_types::ObjectID, committee::Committee, crypto::AuthorityQuorumSignInfo, - digests::TransactionDigest, + digests::{Digest, TransactionDigest}, effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents}, + event::{Event, EventID}, message_envelope::Envelope, messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary, EndOfEpochData}, - object::{bounded_visitor::BoundedVisitor, Data, Object}, + object::{bounded_visitor::BoundedVisitor, Data, MoveObject, Object}, }; use sui_config::genesis::Genesis; use sui_package_resolver::Result as ResolverResult; use sui_package_resolver::{Package, PackageStore, Resolver}; +use sui_sdk::rpc_types::EventFilter; use sui_sdk::SuiClientBuilder; use clap::{Parser, Subcommand}; -use std::{collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex}; +use std::{ + collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex, thread::sleep, + time::Duration, +}; use std::{io::Read, sync::Arc}; use log::info; @@ -98,10 +103,17 @@ enum SCommands { /// Checks a specific object using the light client Object { - /// Transaction hash + /// Object ID #[arg(short, long, value_name = "OID")] oid: String, }, + + /// Follows a specified event stream + EventStream { + /// Event stream ID + #[arg(short, long, value_name = "EID")] + eid: String, + }, } // The config file for the light client including the root of trust genesis digest @@ -531,6 +543,73 @@ async fn get_verified_object(config: &Config, id: ObjectID) -> anyhow::Result ObjectID { + return ObjectID::random(); +} + +#[derive(Debug)] +struct StreamHead { + digest: Digest, + last_event_id: EventID, + count: u64, +} + +// Deserialize the contents of the stream head +async fn deserialize_stream_head_contents(config: &Config, object: &Object) -> anyhow::Result { + todo!("Implement this function"); +} + +/// Gets the specified number of events starting from cursor +async fn get_new_stream_events( + config: &Config, + eid: &StructTag, + cursor: &EventID, + num_new_events: u64, +) -> anyhow::Result> { + let client = SuiClientBuilder::default() + .build(config.full_node_url.as_str()) + .await?; + let event_api = client.event_api(); + + let mut all_events = Vec::new(); + let mut cursor = cursor.clone(); + let mut num_remaining = num_new_events; + loop { + let events_page = event_api + .query_events( + EventFilter::MoveEventType(eid.clone()), + Some(cursor), + Some(num_remaining as usize), + false, // oldest first + ) + .await?; + + if events_page.data.len() == 0 && num_remaining != 0 { + return Err(anyhow!("No more events found")); + } + + num_remaining -= events_page.data.len() as u64; + all_events.extend(events_page.data); + if !events_page.has_next_page { + break; + } + cursor = events_page.next_cursor.unwrap(); + } + + assert_eq!(all_events[all_events.len() - 1].id, cursor); + + Ok(all_events) +} + +fn authenticate_new_events( + events: &[SuiEvent], + last_digest: &Digest, + new_digest: &Digest, +) -> anyhow::Result<()> { + todo!("Implement this function"); +} + #[tokio::main] pub async fn main() { env_logger::init(); @@ -627,6 +706,60 @@ pub async fn main() { .await .expect("Failed to sync checkpoints"); } + + Some(SCommands::EventStream { eid }) => { + let eid = StructTag::from_str(&eid).unwrap(); + println!("Event: {}", eid); + + let oid = derive_stream_head_object_id_from_event_type(&eid); + println!("Stream head's object ID: {}", oid); + + // NOTE: This function downloads one full checkpoint data to verify the object's contents + let mut cur_object = get_verified_object(&config, oid) + .await + .unwrap(); + + let mut cur_head = deserialize_stream_head_contents(&config, &cur_object) + .await + .unwrap(); + let mut cur_cursor = cur_head.last_event_id; + println!("Latest head: {:?}", cur_head); + + // How often do you want to check for new events? + let sleep_duration = Duration::from_secs(5); + loop { + sleep(sleep_duration); + let new_object = get_verified_object(&config, oid) + .await + .unwrap(); + if new_object.version() == cur_object.version() { + // No updates + continue; + } + // Object modified! So we now fetch new events starting from the last fetched event + println!("Stream head modified!"); + let new_head = deserialize_stream_head_contents(&config, &new_object) + .await + .unwrap(); + let num_new_events = new_head.count - cur_head.count; + + // Fetch new events + let new_events = get_new_stream_events(&config, &eid, &cur_cursor, num_new_events) + .await + .unwrap(); + + // Match the new events against the verified latest head + let new_cursor = new_events[new_events.len() - 1].id; + assert_eq!(new_cursor, new_head.last_event_id); + authenticate_new_events(&new_events, &cur_head.digest, &new_head.digest).unwrap(); + + // Update the stream head + cur_head = new_head; + cur_cursor = new_cursor; + cur_object = new_object; + } + } + _ => { println!("No command..."); }