From 790f6530ff74bd6adb333d809765bba0e686c14b Mon Sep 17 00:00:00 2001 From: John Elliott Date: Fri, 8 Nov 2024 20:35:41 -0800 Subject: [PATCH] Add initial implementation of streamChangesSinceV2 Summary: # Context We are introducing EdenFS notifications to support scalable and ergonomic file system notifications for EdenFS mounts. # This Diff This diff tweaks the Thrift API a bit - reorganizing, renaming and adding some types. Additionally, it adds an initial, partially correct, implementation of this API. The current implementation returns changes in ordered buckets (filesystem followed by commits). It also doesn't capture all of the changes nore the dtypes. Subsequent diffs will get to these. # Technical Details Code is mostly a copy-paste from `streamChangesSince`, replacing API types where needed. Changes are around the processing of overlay and snapshot changes to now return the new API change events. # Discussion Points None Differential Revision: D65625598 fbshipit-source-id: 5151473ab7237c8ac792ab59ccd41d4f070204d9 --- eden/fs/service/EdenServiceHandler.cpp | 107 +++++++++++++++++++++++-- eden/fs/service/streamingeden.thrift | 43 ++++++---- eden/fs/telemetry/EdenStats.h | 3 +- 3 files changed, 130 insertions(+), 23 deletions(-) diff --git a/eden/fs/service/EdenServiceHandler.cpp b/eden/fs/service/EdenServiceHandler.cpp index d9bd3433e42b4..7a17cd5c8c806 100644 --- a/eden/fs/service/EdenServiceHandler.cpp +++ b/eden/fs/service/EdenServiceHandler.cpp @@ -2203,14 +2203,111 @@ apache::thrift:: EdenServiceHandler::streamChangesSinceV2( std::unique_ptr params) { auto helper = INSTRUMENT_THRIFT_CALL_WITH_STAT( - DBG3, &ThriftStats::streamChangesSince, *params->mountPoint_ref()); + DBG3, &ThriftStats::streamChangesSinceV2, *params->mountPoint_ref()); + auto mountHandle = lookupMount(params->mountPoint()); - [[maybe_unused]] const auto& fromPosition = *params->fromPosition_ref(); + const auto& fromPosition = *params->fromPosition_ref(); + + checkMountGeneration( + fromPosition, mountHandle.getEdenMount(), "fromPosition"sv); + + auto summed = mountHandle.getJournal().accumulateRange( + *fromPosition.sequenceNumber_ref() + 1); ChangesSinceV2Result result; - return { - std::move(result), - apache::thrift::ServerStream::createEmpty()}; + if (!summed) { + // No changes, just return the fromPosition and an empty stream. + result.toPosition_ref() = fromPosition; + + return { + std::move(result), + apache::thrift::ServerStream::createEmpty()}; + } + + if (summed->isTruncated) { + throw newEdenError( + EDOM, + EdenErrorType::JOURNAL_TRUNCATED, + "Journal entry range has been truncated."); + } + + auto cancellationSource = std::make_shared(); + auto [serverStream, publisher] = + apache::thrift::ServerStream::createPublisher( + [cancellationSource] { cancellationSource->requestCancellation(); }); + auto sharedPublisherLock = std::make_shared>>( + ThriftStreamPublisherOwner{std::move(publisher)}); + + RootIdCodec& rootIdCodec = mountHandle.getObjectStore(); + + JournalPosition toPosition; + toPosition.mountGeneration_ref() = + mountHandle.getEdenMount().getMountGeneration(); + toPosition.sequenceNumber_ref() = summed->toSequence; + toPosition.snapshotHash_ref() = + rootIdCodec.renderRootId(summed->snapshotTransitions.back()); + result.toPosition_ref() = toPosition; + + // TODO: temporary recycling of `streamChangesSince` logic + // This will need to be updated to reflect the new way of reporting + // changes in large an small sizes. Additionally, the differntiation + // between files and directories. + // + // Using this for now to get the end-to-end plumbing working and + // to establish some early testing. + for (auto& entry : summed->changedFilesInOverlay) { + const auto& changeInfo = entry.second; + + // TODO: add in dtype - requires plumbing through all journal layers + // TODO: rework to support renamed, and optionally, replaced + // This likely means moving away from the `accumulateRange` usage + // above and directly processing via `forEachDelta` which gives much + // more info and properly interleaves filesytem and commit transition + // changes + SmallChangeNotification smallChange; + if (!changeInfo.existedBefore && changeInfo.existedAfter) { + Added added; + added.path() = entry.first.asString(); + smallChange.set_added(std::move(added)); + } else if (changeInfo.existedBefore && !changeInfo.existedAfter) { + Removed removed; + removed.path() = entry.first.asString(); + smallChange.set_removed(std::move(removed)); + } else { + Modified modified; + modified.path() = entry.first.asString(); + smallChange.set_modified(std::move(modified)); + } + + ChangeNotification change; + change.set_smallChange(std::move(smallChange)); + + ChangeNotificationResult changeNotificationResult; + changeNotificationResult.change() = change; + + sharedPublisherLock->rlock()->next(std::move(changeNotificationResult)); + } + + for (const auto& name : summed->uncleanPaths) { + SmallChangeNotification smallChange; + Modified modified; + modified.path() = name.asString(); + smallChange.set_modified(std::move(modified)); + + ChangeNotification change; + change.set_smallChange(std::move(smallChange)); + + ChangeNotificationResult changeNotificationResult; + changeNotificationResult.change() = change; + + sharedPublisherLock->rlock()->next(std::move(changeNotificationResult)); + } + + // NOTE: we are not surfacing snapshot transitions + // That will come later when we begin to use `forEachDelta`. + + return {std::move(result), std::move(serverStream)}; } apache::thrift::ResponseAndServerStream diff --git a/eden/fs/service/streamingeden.thrift b/eden/fs/service/streamingeden.thrift index 7ee67f8439a7c..077a3df889443 100644 --- a/eden/fs/service/streamingeden.thrift +++ b/eden/fs/service/streamingeden.thrift @@ -62,14 +62,14 @@ const i64 FS_EVENT_READ = 1; const i64 FS_EVENT_WRITE = 2; const i64 FS_EVENT_OTHER = 4; -struct CommitTransition { - 1: eden.ThriftRootId from; - 2: eden.ThriftRootId to; +struct Added { + 1: eden.Dtype fileType; + 3: eden.PathString path; } -struct DirectoryRenamed { - 1: eden.PathString from; - 2: eden.PathString to; +struct Modified { + 1: eden.Dtype fileType; + 3: eden.PathString path; } struct Renamed { @@ -78,26 +78,35 @@ struct Renamed { 3: eden.PathString to; } -struct Added { +struct Replaced { 1: eden.Dtype fileType; - 3: eden.PathString path; + 2: eden.PathString from; + 3: eden.PathString to; } -struct Deleted { +struct Removed { 1: eden.Dtype fileType; 3: eden.PathString path; } -struct Modified { - 1: eden.Dtype fileType; - 3: eden.PathString path; +union SmallChangeNotification { + // @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2 + 1: Added added; + // @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2 + 2: Modified modified; + // @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2 + 3: Renamed renamed; + 4: Replaced replaced; + 5: Removed removed; } -union SmallChangeNotification { - 1: Renamed renamed; - 2: Added added; - 3: Deleted deleted; - 4: Modified modified; +struct DirectoryRenamed { + 1: eden.PathString from; + 2: eden.PathString to; +} +struct CommitTransition { + 1: eden.ThriftRootId from; + 2: eden.ThriftRootId to; } union LargeChangeNotification { diff --git a/eden/fs/telemetry/EdenStats.h b/eden/fs/telemetry/EdenStats.h index 02d14fb793a71..ef783589ec614 100644 --- a/eden/fs/telemetry/EdenStats.h +++ b/eden/fs/telemetry/EdenStats.h @@ -584,7 +584,8 @@ struct JournalStats : StatsGroup { struct ThriftStats : StatsGroup { Duration streamChangesSince{ "thrift.StreamingEdenService.streamChangesSince.streaming_time_us"}; - + Duration streamChangesSinceV2{ + "thrift.StreamingEdenService.streamChangesSinceV2.streaming_time_us"}; Duration streamSelectedChangesSince{ "thrift.StreamingEdenService.streamSelectedChangesSince.streaming_time_us"};