Skip to content

Commit

Permalink
Use get_journal_position in subscribe command
Browse files Browse the repository at this point in the history
Summary:
## Context

We should not directly make Thrift calls from EdenFS cli commands/subcommands. Instead, encapsulate the methods into the eden instance.

## This Diff

We switched from making a Thrift call to calling the newly-added `get_journal_position` API.

Reviewed By: jdelliot

Differential Revision: D66790164

fbshipit-source-id: 103c3d1b5819c2bba02265b72cd36bc82c80d301
  • Loading branch information
lXXXw authored and facebook-github-bot committed Dec 10, 2024
1 parent b9f0447 commit 995d831
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
1 change: 0 additions & 1 deletion eden/fs/cli_rs/edenfs-client/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ impl EdenFsInstance {
}
}

#[cfg(fbcode_build)]
pub async fn get_journal_position(
&self,
mount_point: &Option<PathBuf>,
Expand Down
64 changes: 37 additions & 27 deletions eden/fs/cli_rs/edenfs-commands/src/debug/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
use clap::Parser;
use edenfs_client::types::JournalPosition;
use edenfs_client::utils::locate_repo_root;
use edenfs_client::EdenFsInstance;
use futures::StreamExt;
Expand Down Expand Up @@ -123,17 +124,16 @@ mod fmt {
#[derive(Debug, Serialize)]
struct SubscribeResponse {
mount_generation: i64,
// Thrift somehow generates i64 for unsigned64 type
sequence_number: i64,
sequence_number: u64,
snapshot_hash: String,
}

impl From<edenfs_thrift::JournalPosition> for SubscribeResponse {
fn from(from: edenfs_thrift::JournalPosition) -> Self {
impl From<JournalPosition> for SubscribeResponse {
fn from(from: JournalPosition) -> Self {
Self {
mount_generation: from.mountGeneration,
sequence_number: from.sequenceNumber,
snapshot_hash: hex::encode(from.snapshotHash),
mount_generation: from.mount_generation,
sequence_number: from.sequence_number,
snapshot_hash: hex::encode(from.snapshot_hash),
}
}
}
Expand Down Expand Up @@ -198,29 +198,31 @@ fn decide_should_notify(changes: edenfs_thrift::FileDelta) -> bool {
impl SubscribeCmd {
async fn _make_notify_event(
mount_point: &Vec<u8>,
last_position: &mut Option<edenfs_thrift::JournalPosition>,
mount_point_path: &Option<PathBuf>,
last_position: &mut Option<edenfs_client::types::JournalPosition>,
) -> Option<ResponseBuilder> {
let instance = EdenFsInstance::global();
let client = match instance.connect(None).await {
Ok(client) => client,

let journal = match instance.get_journal_position(mount_point_path, None).await {
Ok(journal) => journal,
Err(e) => {
return Some(ResponseBuilder::error(&format!(
"error while establishing connection to EdenFS server {e:?}"
"error while getting current journal position: {e:?}",
)));
}
};

let journal = match client.getCurrentJournalPosition(mount_point).await {
Ok(journal) => journal,
let client = match instance.connect(None).await {
Ok(client) => client,
Err(e) => {
return Some(ResponseBuilder::error(&format!(
"error while getting current journal position: {e:?}",
"error while establishing connection to EdenFS server {e:?}"
)));
}
};

let should_notify = if let Some(last_position) = last_position.replace(journal.clone()) {
if last_position.sequenceNumber == journal.sequenceNumber {
if last_position.sequence_number == journal.sequence_number {
tracing::trace!(
?journal,
?last_position,
Expand All @@ -230,7 +232,7 @@ impl SubscribeCmd {
}

let changes = client
.getFilesChangedSince(mount_point, &last_position)
.getFilesChangedSince(mount_point, &last_position.into())
.await;

match changes {
Expand Down Expand Up @@ -306,21 +308,29 @@ impl crate::Subcommand for SubscribeCmd {
stdout.write_all(&bytes).await.ok();
}

let mut last_position = {
if let Ok(client) = EdenFsInstance::global().connect(None).await {
client.getCurrentJournalPosition(&mount_point).await.ok()
} else {
None
}
let mount_point_path_opt = Some(mount_point_path);

let instance = EdenFsInstance::global();
let mut last_position = match instance
.get_journal_position(&mount_point_path_opt, None)
.await
{
Ok(journal) => Some(journal),
Err(_) => None,
};

loop {
notify.notified().await;
let response =
match Self::_make_notify_event(&mount_point, &mut last_position).await {
None => continue,
Some(response) => response.build(),
};
let response = match Self::_make_notify_event(
&mount_point,
&mount_point_path_opt,
&mut last_position,
)
.await
{
None => continue,
Some(response) => response.build(),
};

match serde_json::to_vec(&response) {
Ok(mut bytes) => {
Expand Down

0 comments on commit 995d831

Please sign in to comment.