From 995d831172513ec5f49696fff32314a3b06ebc69 Mon Sep 17 00:00:00 2001 From: Xiaowei Lu Date: Tue, 10 Dec 2024 14:55:12 -0800 Subject: [PATCH] Use get_journal_position in subscribe command Summary: ## Context We should not directly make Thrift calls from EdenFS cli commands/subcommands. Instead, encapsulate the methods into the eden instance. ## This Diff We switched from making a Thrift call to calling the newly-added `get_journal_position` API. Reviewed By: jdelliot Differential Revision: D66790164 fbshipit-source-id: 103c3d1b5819c2bba02265b72cd36bc82c80d301 --- eden/fs/cli_rs/edenfs-client/src/instance.rs | 1 - .../edenfs-commands/src/debug/subscribe.rs | 64 +++++++++++-------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/eden/fs/cli_rs/edenfs-client/src/instance.rs b/eden/fs/cli_rs/edenfs-client/src/instance.rs index 6b66dbeede7a6..1260025042d74 100644 --- a/eden/fs/cli_rs/edenfs-client/src/instance.rs +++ b/eden/fs/cli_rs/edenfs-client/src/instance.rs @@ -246,7 +246,6 @@ impl EdenFsInstance { } } - #[cfg(fbcode_build)] pub async fn get_journal_position( &self, mount_point: &Option, diff --git a/eden/fs/cli_rs/edenfs-commands/src/debug/subscribe.rs b/eden/fs/cli_rs/edenfs-commands/src/debug/subscribe.rs index 44554172fb1c0..e64bd8ec3813d 100644 --- a/eden/fs/cli_rs/edenfs-commands/src/debug/subscribe.rs +++ b/eden/fs/cli_rs/edenfs-commands/src/debug/subscribe.rs @@ -23,6 +23,7 @@ use anyhow::Context; use anyhow::Result; use async_trait::async_trait; use clap::Parser; +use edenfs_client::types::JournalPosition; use edenfs_client::utils::locate_repo_root; use edenfs_client::EdenFsInstance; use futures::StreamExt; @@ -123,17 +124,16 @@ mod fmt { #[derive(Debug, Serialize)] struct SubscribeResponse { mount_generation: i64, - // Thrift somehow generates i64 for unsigned64 type - sequence_number: i64, + sequence_number: u64, snapshot_hash: String, } -impl From for SubscribeResponse { - fn from(from: edenfs_thrift::JournalPosition) -> Self { +impl From for SubscribeResponse { + fn from(from: JournalPosition) -> Self { Self { - mount_generation: from.mountGeneration, - sequence_number: from.sequenceNumber, - snapshot_hash: hex::encode(from.snapshotHash), + mount_generation: from.mount_generation, + sequence_number: from.sequence_number, + snapshot_hash: hex::encode(from.snapshot_hash), } } } @@ -198,29 +198,31 @@ fn decide_should_notify(changes: edenfs_thrift::FileDelta) -> bool { impl SubscribeCmd { async fn _make_notify_event( mount_point: &Vec, - last_position: &mut Option, + mount_point_path: &Option, + last_position: &mut Option, ) -> Option { let instance = EdenFsInstance::global(); - let client = match instance.connect(None).await { - Ok(client) => client, + + let journal = match instance.get_journal_position(mount_point_path, None).await { + Ok(journal) => journal, Err(e) => { return Some(ResponseBuilder::error(&format!( - "error while establishing connection to EdenFS server {e:?}" + "error while getting current journal position: {e:?}", ))); } }; - let journal = match client.getCurrentJournalPosition(mount_point).await { - Ok(journal) => journal, + let client = match instance.connect(None).await { + Ok(client) => client, Err(e) => { return Some(ResponseBuilder::error(&format!( - "error while getting current journal position: {e:?}", + "error while establishing connection to EdenFS server {e:?}" ))); } }; let should_notify = if let Some(last_position) = last_position.replace(journal.clone()) { - if last_position.sequenceNumber == journal.sequenceNumber { + if last_position.sequence_number == journal.sequence_number { tracing::trace!( ?journal, ?last_position, @@ -230,7 +232,7 @@ impl SubscribeCmd { } let changes = client - .getFilesChangedSince(mount_point, &last_position) + .getFilesChangedSince(mount_point, &last_position.into()) .await; match changes { @@ -306,21 +308,29 @@ impl crate::Subcommand for SubscribeCmd { stdout.write_all(&bytes).await.ok(); } - let mut last_position = { - if let Ok(client) = EdenFsInstance::global().connect(None).await { - client.getCurrentJournalPosition(&mount_point).await.ok() - } else { - None - } + let mount_point_path_opt = Some(mount_point_path); + + let instance = EdenFsInstance::global(); + let mut last_position = match instance + .get_journal_position(&mount_point_path_opt, None) + .await + { + Ok(journal) => Some(journal), + Err(_) => None, }; loop { notify.notified().await; - let response = - match Self::_make_notify_event(&mount_point, &mut last_position).await { - None => continue, - Some(response) => response.build(), - }; + let response = match Self::_make_notify_event( + &mount_point, + &mount_point_path_opt, + &mut last_position, + ) + .await + { + None => continue, + Some(response) => response.build(), + }; match serde_json::to_vec(&response) { Ok(mut bytes) => {