Skip to content

Commit

Permalink
WIP: Outline of how a client can follow an event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
mskd12 committed Jan 7, 2025
1 parent cd1ad1f commit 8a2a371
Showing 1 changed file with 139 additions and 6 deletions.
145 changes: 139 additions & 6 deletions crates/sui-light-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,34 @@

use anyhow::anyhow;
use async_trait::async_trait;
use move_core_types::account_address::AccountAddress;
use sui_json_rpc_types::{SuiObjectDataOptions, SuiTransactionBlockResponseOptions};
use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
use sui_json_rpc_types::{SuiEvent, SuiObjectDataOptions, SuiTransactionBlockResponseOptions};

use sui_rpc_api::CheckpointData;
use sui_types::{
base_types::ObjectID,
committee::Committee,
crypto::AuthorityQuorumSignInfo,
digests::TransactionDigest,
digests::{Digest, TransactionDigest},
effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
event::{Event, EventID},
message_envelope::Envelope,
messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary, EndOfEpochData},
object::{bounded_visitor::BoundedVisitor, Data, Object},
object::{bounded_visitor::BoundedVisitor, Data, MoveObject, Object},
};

use sui_config::genesis::Genesis;

use sui_package_resolver::Result as ResolverResult;
use sui_package_resolver::{Package, PackageStore, Resolver};
use sui_sdk::rpc_types::EventFilter;
use sui_sdk::SuiClientBuilder;

use clap::{Parser, Subcommand};
use std::{collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex};
use std::{
collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex, thread::sleep,
time::Duration,
};
use std::{io::Read, sync::Arc};

use log::info;
Expand Down Expand Up @@ -98,10 +103,17 @@ enum SCommands {

/// Checks a specific object using the light client
Object {
/// Transaction hash
/// Object ID
#[arg(short, long, value_name = "OID")]
oid: String,
},

/// Follows a specified event stream
EventStream {
/// Event stream ID
#[arg(short, long, value_name = "EID")]
eid: String,
},
}

// The config file for the light client including the root of trust genesis digest
Expand Down Expand Up @@ -531,6 +543,73 @@ async fn get_verified_object(config: &Config, id: ObjectID) -> anyhow::Result<Ob
Ok(object)
}

// TODO: Replace with H(event_type)
fn derive_stream_head_object_id_from_event_type(_event_type: &StructTag) -> ObjectID {
return ObjectID::random();
}

#[derive(Debug)]
struct StreamHead {
digest: Digest,
last_event_id: EventID,
count: u64,
}

// Deserialize the contents of the stream head
async fn deserialize_stream_head_contents(config: &Config, object: &Object) -> anyhow::Result<StreamHead> {
todo!("Implement this function");
}

/// Gets the specified number of events starting from cursor
async fn get_new_stream_events(
config: &Config,
eid: &StructTag,
cursor: &EventID,
num_new_events: u64,
) -> anyhow::Result<Vec<SuiEvent>> {
let client = SuiClientBuilder::default()
.build(config.full_node_url.as_str())
.await?;
let event_api = client.event_api();

let mut all_events = Vec::new();
let mut cursor = cursor.clone();
let mut num_remaining = num_new_events;
loop {
let events_page = event_api
.query_events(
EventFilter::MoveEventType(eid.clone()),
Some(cursor),
Some(num_remaining as usize),
false, // oldest first
)
.await?;

if events_page.data.len() == 0 && num_remaining != 0 {
return Err(anyhow!("No more events found"));
}

num_remaining -= events_page.data.len() as u64;
all_events.extend(events_page.data);
if !events_page.has_next_page {
break;
}
cursor = events_page.next_cursor.unwrap();
}

assert_eq!(all_events[all_events.len() - 1].id, cursor);

Ok(all_events)
}

fn authenticate_new_events(
events: &[SuiEvent],
last_digest: &Digest,
new_digest: &Digest,
) -> anyhow::Result<()> {
todo!("Implement this function");
}

#[tokio::main]
pub async fn main() {
env_logger::init();
Expand Down Expand Up @@ -627,6 +706,60 @@ pub async fn main() {
.await
.expect("Failed to sync checkpoints");
}

Some(SCommands::EventStream { eid }) => {
let eid = StructTag::from_str(&eid).unwrap();
println!("Event: {}", eid);

let oid = derive_stream_head_object_id_from_event_type(&eid);
println!("Stream head's object ID: {}", oid);

// NOTE: This function downloads one full checkpoint data to verify the object's contents
let mut cur_object = get_verified_object(&config, oid)
.await
.unwrap();

let mut cur_head = deserialize_stream_head_contents(&config, &cur_object)
.await
.unwrap();
let mut cur_cursor = cur_head.last_event_id;
println!("Latest head: {:?}", cur_head);

// How often do you want to check for new events?
let sleep_duration = Duration::from_secs(5);
loop {
sleep(sleep_duration);
let new_object = get_verified_object(&config, oid)
.await
.unwrap();
if new_object.version() == cur_object.version() {
// No updates
continue;
}
// Object modified! So we now fetch new events starting from the last fetched event
println!("Stream head modified!");
let new_head = deserialize_stream_head_contents(&config, &new_object)
.await
.unwrap();
let num_new_events = new_head.count - cur_head.count;

// Fetch new events
let new_events = get_new_stream_events(&config, &eid, &cur_cursor, num_new_events)
.await
.unwrap();

// Match the new events against the verified latest head
let new_cursor = new_events[new_events.len() - 1].id;
assert_eq!(new_cursor, new_head.last_event_id);
authenticate_new_events(&new_events, &cur_head.digest, &new_head.digest).unwrap();

// Update the stream head
cur_head = new_head;
cur_cursor = new_cursor;
cur_object = new_object;
}
}

_ => {
println!("No command...");
}
Expand Down

0 comments on commit 8a2a371

Please sign in to comment.