Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client-side changes for event streams #20804

Draft
wants to merge 1 commit into
base: deepak/light-client-minor-refactor
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 139 additions & 6 deletions crates/sui-light-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,34 @@

use anyhow::anyhow;
use async_trait::async_trait;
use move_core_types::account_address::AccountAddress;
use sui_json_rpc_types::{SuiObjectDataOptions, SuiTransactionBlockResponseOptions};
use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
use sui_json_rpc_types::{SuiEvent, SuiObjectDataOptions, SuiTransactionBlockResponseOptions};

use sui_rpc_api::CheckpointData;
use sui_types::{
base_types::ObjectID,
committee::Committee,
crypto::AuthorityQuorumSignInfo,
digests::TransactionDigest,
digests::{Digest, TransactionDigest},
effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
event::{Event, EventID},
message_envelope::Envelope,
messages_checkpoint::{CertifiedCheckpointSummary, CheckpointSummary, EndOfEpochData},
object::{bounded_visitor::BoundedVisitor, Data, Object},
object::{bounded_visitor::BoundedVisitor, Data, MoveObject, Object},
};

use sui_config::genesis::Genesis;

use sui_package_resolver::Result as ResolverResult;
use sui_package_resolver::{Package, PackageStore, Resolver};
use sui_sdk::rpc_types::EventFilter;
use sui_sdk::SuiClientBuilder;

use clap::{Parser, Subcommand};
use std::{collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex};
use std::{
collections::HashMap, fs, io::Write, path::PathBuf, str::FromStr, sync::Mutex, thread::sleep,
time::Duration,
};
use std::{io::Read, sync::Arc};

use log::info;
Expand Down Expand Up @@ -98,10 +103,17 @@ enum SCommands {

/// Checks a specific object using the light client
Object {
/// Transaction hash
/// Object ID
#[arg(short, long, value_name = "OID")]
oid: String,
},

/// Follows a specified event stream
EventStream {
/// Event stream ID
#[arg(short, long, value_name = "EID")]
eid: String,
},
}

// The config file for the light client including the root of trust genesis digest
Expand Down Expand Up @@ -531,6 +543,73 @@ async fn get_verified_object(config: &Config, id: ObjectID) -> anyhow::Result<Ob
Ok(object)
}

// TODO: Replace with H(event_type)
fn derive_stream_head_object_id_from_event_type(_event_type: &StructTag) -> ObjectID {
return ObjectID::random();
}

#[derive(Debug)]
struct StreamHead {
digest: Digest,
last_event_id: EventID,
count: u64,
}

// Deserialize the contents of the stream head
async fn deserialize_stream_head_contents(config: &Config, object: &Object) -> anyhow::Result<StreamHead> {
todo!("Implement this function");
}

/// Gets the specified number of events starting from cursor
async fn get_new_stream_events(
config: &Config,
eid: &StructTag,
cursor: &EventID,
num_new_events: u64,
) -> anyhow::Result<Vec<SuiEvent>> {
let client = SuiClientBuilder::default()
.build(config.full_node_url.as_str())
.await?;
let event_api = client.event_api();

let mut all_events = Vec::new();
let mut cursor = cursor.clone();
let mut num_remaining = num_new_events;
loop {
let events_page = event_api
.query_events(
EventFilter::MoveEventType(eid.clone()),
Some(cursor),
Some(num_remaining as usize),
false, // oldest first
)
.await?;

if events_page.data.len() == 0 && num_remaining != 0 {
return Err(anyhow!("No more events found"));
}

num_remaining -= events_page.data.len() as u64;
all_events.extend(events_page.data);
if !events_page.has_next_page {
break;
}
cursor = events_page.next_cursor.unwrap();
}

assert_eq!(all_events[all_events.len() - 1].id, cursor);

Ok(all_events)
}

fn authenticate_new_events(
events: &[SuiEvent],
last_digest: &Digest,
new_digest: &Digest,
) -> anyhow::Result<()> {
todo!("Implement this function");
}

#[tokio::main]
pub async fn main() {
env_logger::init();
Expand Down Expand Up @@ -627,6 +706,60 @@ pub async fn main() {
.await
.expect("Failed to sync checkpoints");
}

Some(SCommands::EventStream { eid }) => {
let eid = StructTag::from_str(&eid).unwrap();
println!("Event: {}", eid);

let oid = derive_stream_head_object_id_from_event_type(&eid);
println!("Stream head's object ID: {}", oid);

// NOTE: This function downloads one full checkpoint data to verify the object's contents
let mut cur_object = get_verified_object(&config, oid)
.await
.unwrap();

let mut cur_head = deserialize_stream_head_contents(&config, &cur_object)
.await
.unwrap();
let mut cur_cursor = cur_head.last_event_id;
println!("Latest head: {:?}", cur_head);

// How often do you want to check for new events?
let sleep_duration = Duration::from_secs(5);
loop {
sleep(sleep_duration);
let new_object = get_verified_object(&config, oid)
.await
.unwrap();
if new_object.version() == cur_object.version() {
// No updates
continue;
}
// Object modified! So we now fetch new events starting from the last fetched event
println!("Stream head modified!");
let new_head = deserialize_stream_head_contents(&config, &new_object)
.await
.unwrap();
let num_new_events = new_head.count - cur_head.count;

// Fetch new events
let new_events = get_new_stream_events(&config, &eid, &cur_cursor, num_new_events)
.await
.unwrap();

// Match the new events against the verified latest head
let new_cursor = new_events[new_events.len() - 1].id;
assert_eq!(new_cursor, new_head.last_event_id);
authenticate_new_events(&new_events, &cur_head.digest, &new_head.digest).unwrap();

// Update the stream head
cur_head = new_head;
cur_cursor = new_cursor;
cur_object = new_object;
}
}

_ => {
println!("No command...");
}
Expand Down
Loading