Skip to content

Commit

Permalink
Add initial implementation of streamChangesSinceV2
Browse files Browse the repository at this point in the history
Summary:
# Context

We are introducing EdenFS notifications to support scalable and ergonomic file system notifications for EdenFS mounts.

# This Diff

This diff tweaks the Thrift API a bit - reorganizing, renaming and adding some types. Additionally, it adds an initial, partially correct, implementation of this API. The current implementation returns changes in ordered buckets (filesystem followed by commits). It also doesn't capture all of the changes nore the dtypes. Subsequent diffs will get to these.

# Technical Details

Code is mostly a copy-paste from `streamChangesSince`, replacing API types where needed. Changes are around the processing of overlay and snapshot changes to now return the new API change events.

# Discussion Points

None

Differential Revision: D65625598

fbshipit-source-id: 5151473ab7237c8ac792ab59ccd41d4f070204d9
  • Loading branch information
jdelliot authored and facebook-github-bot committed Nov 9, 2024
1 parent 14840d7 commit 790f653
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 23 deletions.
107 changes: 102 additions & 5 deletions eden/fs/service/EdenServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2203,14 +2203,111 @@ apache::thrift::
EdenServiceHandler::streamChangesSinceV2(
std::unique_ptr<StreamChangesSinceV2Params> params) {
auto helper = INSTRUMENT_THRIFT_CALL_WITH_STAT(
DBG3, &ThriftStats::streamChangesSince, *params->mountPoint_ref());
DBG3, &ThriftStats::streamChangesSinceV2, *params->mountPoint_ref());

auto mountHandle = lookupMount(params->mountPoint());
[[maybe_unused]] const auto& fromPosition = *params->fromPosition_ref();
const auto& fromPosition = *params->fromPosition_ref();

checkMountGeneration(
fromPosition, mountHandle.getEdenMount(), "fromPosition"sv);

auto summed = mountHandle.getJournal().accumulateRange(
*fromPosition.sequenceNumber_ref() + 1);

ChangesSinceV2Result result;
return {
std::move(result),
apache::thrift::ServerStream<ChangeNotificationResult>::createEmpty()};
if (!summed) {
// No changes, just return the fromPosition and an empty stream.
result.toPosition_ref() = fromPosition;

return {
std::move(result),
apache::thrift::ServerStream<ChangeNotificationResult>::createEmpty()};
}

if (summed->isTruncated) {
throw newEdenError(
EDOM,
EdenErrorType::JOURNAL_TRUNCATED,
"Journal entry range has been truncated.");
}

auto cancellationSource = std::make_shared<folly::CancellationSource>();
auto [serverStream, publisher] =
apache::thrift::ServerStream<ChangeNotificationResult>::createPublisher(
[cancellationSource] { cancellationSource->requestCancellation(); });
auto sharedPublisherLock = std::make_shared<folly::Synchronized<
ThriftStreamPublisherOwner<ChangeNotificationResult>>>(
ThriftStreamPublisherOwner{std::move(publisher)});

RootIdCodec& rootIdCodec = mountHandle.getObjectStore();

JournalPosition toPosition;
toPosition.mountGeneration_ref() =
mountHandle.getEdenMount().getMountGeneration();
toPosition.sequenceNumber_ref() = summed->toSequence;
toPosition.snapshotHash_ref() =
rootIdCodec.renderRootId(summed->snapshotTransitions.back());
result.toPosition_ref() = toPosition;

// TODO: temporary recycling of `streamChangesSince` logic
// This will need to be updated to reflect the new way of reporting
// changes in large an small sizes. Additionally, the differntiation
// between files and directories.
//
// Using this for now to get the end-to-end plumbing working and
// to establish some early testing.
for (auto& entry : summed->changedFilesInOverlay) {
const auto& changeInfo = entry.second;

// TODO: add in dtype - requires plumbing through all journal layers
// TODO: rework to support renamed, and optionally, replaced
// This likely means moving away from the `accumulateRange` usage
// above and directly processing via `forEachDelta` which gives much
// more info and properly interleaves filesytem and commit transition
// changes
SmallChangeNotification smallChange;
if (!changeInfo.existedBefore && changeInfo.existedAfter) {
Added added;
added.path() = entry.first.asString();
smallChange.set_added(std::move(added));
} else if (changeInfo.existedBefore && !changeInfo.existedAfter) {
Removed removed;
removed.path() = entry.first.asString();
smallChange.set_removed(std::move(removed));
} else {
Modified modified;
modified.path() = entry.first.asString();
smallChange.set_modified(std::move(modified));
}

ChangeNotification change;
change.set_smallChange(std::move(smallChange));

ChangeNotificationResult changeNotificationResult;
changeNotificationResult.change() = change;

sharedPublisherLock->rlock()->next(std::move(changeNotificationResult));
}

for (const auto& name : summed->uncleanPaths) {
SmallChangeNotification smallChange;
Modified modified;
modified.path() = name.asString();
smallChange.set_modified(std::move(modified));

ChangeNotification change;
change.set_smallChange(std::move(smallChange));

ChangeNotificationResult changeNotificationResult;
changeNotificationResult.change() = change;

sharedPublisherLock->rlock()->next(std::move(changeNotificationResult));
}

// NOTE: we are not surfacing snapshot transitions
// That will come later when we begin to use `forEachDelta`.

return {std::move(result), std::move(serverStream)};
}

apache::thrift::ResponseAndServerStream<ChangesSinceResult, ChangedFileResult>
Expand Down
43 changes: 26 additions & 17 deletions eden/fs/service/streamingeden.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ const i64 FS_EVENT_READ = 1;
const i64 FS_EVENT_WRITE = 2;
const i64 FS_EVENT_OTHER = 4;

struct CommitTransition {
1: eden.ThriftRootId from;
2: eden.ThriftRootId to;
struct Added {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct DirectoryRenamed {
1: eden.PathString from;
2: eden.PathString to;
struct Modified {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct Renamed {
Expand All @@ -78,26 +78,35 @@ struct Renamed {
3: eden.PathString to;
}

struct Added {
struct Replaced {
1: eden.Dtype fileType;
3: eden.PathString path;
2: eden.PathString from;
3: eden.PathString to;
}

struct Deleted {
struct Removed {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct Modified {
1: eden.Dtype fileType;
3: eden.PathString path;
union SmallChangeNotification {
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
1: Added added;
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
2: Modified modified;
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
3: Renamed renamed;
4: Replaced replaced;
5: Removed removed;
}

union SmallChangeNotification {
1: Renamed renamed;
2: Added added;
3: Deleted deleted;
4: Modified modified;
struct DirectoryRenamed {
1: eden.PathString from;
2: eden.PathString to;
}
struct CommitTransition {
1: eden.ThriftRootId from;
2: eden.ThriftRootId to;
}

union LargeChangeNotification {
Expand Down
3 changes: 2 additions & 1 deletion eden/fs/telemetry/EdenStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ struct JournalStats : StatsGroup<JournalStats> {
struct ThriftStats : StatsGroup<ThriftStats> {
Duration streamChangesSince{
"thrift.StreamingEdenService.streamChangesSince.streaming_time_us"};

Duration streamChangesSinceV2{
"thrift.StreamingEdenService.streamChangesSinceV2.streaming_time_us"};
Duration streamSelectedChangesSince{
"thrift.StreamingEdenService.streamSelectedChangesSince.streaming_time_us"};

Expand Down

0 comments on commit 790f653

Please sign in to comment.