Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
VishalGawade1 authored Nov 9, 2024
2 parents 765c909 + 790f653 commit 8770b2d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 23 deletions.
107 changes: 102 additions & 5 deletions eden/fs/service/EdenServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2203,14 +2203,111 @@ apache::thrift::
EdenServiceHandler::streamChangesSinceV2(
std::unique_ptr<StreamChangesSinceV2Params> params) {
auto helper = INSTRUMENT_THRIFT_CALL_WITH_STAT(
DBG3, &ThriftStats::streamChangesSince, *params->mountPoint_ref());
DBG3, &ThriftStats::streamChangesSinceV2, *params->mountPoint_ref());

auto mountHandle = lookupMount(params->mountPoint());
[[maybe_unused]] const auto& fromPosition = *params->fromPosition_ref();
const auto& fromPosition = *params->fromPosition_ref();

checkMountGeneration(
fromPosition, mountHandle.getEdenMount(), "fromPosition"sv);

auto summed = mountHandle.getJournal().accumulateRange(
*fromPosition.sequenceNumber_ref() + 1);

ChangesSinceV2Result result;
return {
std::move(result),
apache::thrift::ServerStream<ChangeNotificationResult>::createEmpty()};
if (!summed) {
// No changes, just return the fromPosition and an empty stream.
result.toPosition_ref() = fromPosition;

return {
std::move(result),
apache::thrift::ServerStream<ChangeNotificationResult>::createEmpty()};
}

if (summed->isTruncated) {
throw newEdenError(
EDOM,
EdenErrorType::JOURNAL_TRUNCATED,
"Journal entry range has been truncated.");
}

auto cancellationSource = std::make_shared<folly::CancellationSource>();
auto [serverStream, publisher] =
apache::thrift::ServerStream<ChangeNotificationResult>::createPublisher(
[cancellationSource] { cancellationSource->requestCancellation(); });
auto sharedPublisherLock = std::make_shared<folly::Synchronized<
ThriftStreamPublisherOwner<ChangeNotificationResult>>>(
ThriftStreamPublisherOwner{std::move(publisher)});

RootIdCodec& rootIdCodec = mountHandle.getObjectStore();

JournalPosition toPosition;
toPosition.mountGeneration_ref() =
mountHandle.getEdenMount().getMountGeneration();
toPosition.sequenceNumber_ref() = summed->toSequence;
toPosition.snapshotHash_ref() =
rootIdCodec.renderRootId(summed->snapshotTransitions.back());
result.toPosition_ref() = toPosition;

// TODO: temporary recycling of `streamChangesSince` logic
// This will need to be updated to reflect the new way of reporting
// changes in large an small sizes. Additionally, the differntiation
// between files and directories.
//
// Using this for now to get the end-to-end plumbing working and
// to establish some early testing.
for (auto& entry : summed->changedFilesInOverlay) {
const auto& changeInfo = entry.second;

// TODO: add in dtype - requires plumbing through all journal layers
// TODO: rework to support renamed, and optionally, replaced
// This likely means moving away from the `accumulateRange` usage
// above and directly processing via `forEachDelta` which gives much
// more info and properly interleaves filesytem and commit transition
// changes
SmallChangeNotification smallChange;
if (!changeInfo.existedBefore && changeInfo.existedAfter) {
Added added;
added.path() = entry.first.asString();
smallChange.set_added(std::move(added));
} else if (changeInfo.existedBefore && !changeInfo.existedAfter) {
Removed removed;
removed.path() = entry.first.asString();
smallChange.set_removed(std::move(removed));
} else {
Modified modified;
modified.path() = entry.first.asString();
smallChange.set_modified(std::move(modified));
}

ChangeNotification change;
change.set_smallChange(std::move(smallChange));

ChangeNotificationResult changeNotificationResult;
changeNotificationResult.change() = change;

sharedPublisherLock->rlock()->next(std::move(changeNotificationResult));
}

for (const auto& name : summed->uncleanPaths) {
SmallChangeNotification smallChange;
Modified modified;
modified.path() = name.asString();
smallChange.set_modified(std::move(modified));

ChangeNotification change;
change.set_smallChange(std::move(smallChange));

ChangeNotificationResult changeNotificationResult;
changeNotificationResult.change() = change;

sharedPublisherLock->rlock()->next(std::move(changeNotificationResult));
}

// NOTE: we are not surfacing snapshot transitions
// That will come later when we begin to use `forEachDelta`.

return {std::move(result), std::move(serverStream)};
}

apache::thrift::ResponseAndServerStream<ChangesSinceResult, ChangedFileResult>
Expand Down
43 changes: 26 additions & 17 deletions eden/fs/service/streamingeden.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ const i64 FS_EVENT_READ = 1;
const i64 FS_EVENT_WRITE = 2;
const i64 FS_EVENT_OTHER = 4;

struct CommitTransition {
1: eden.ThriftRootId from;
2: eden.ThriftRootId to;
struct Added {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct DirectoryRenamed {
1: eden.PathString from;
2: eden.PathString to;
struct Modified {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct Renamed {
Expand All @@ -78,26 +78,35 @@ struct Renamed {
3: eden.PathString to;
}

struct Added {
struct Replaced {
1: eden.Dtype fileType;
3: eden.PathString path;
2: eden.PathString from;
3: eden.PathString to;
}

struct Deleted {
struct Removed {
1: eden.Dtype fileType;
3: eden.PathString path;
}

struct Modified {
1: eden.Dtype fileType;
3: eden.PathString path;
union SmallChangeNotification {
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
1: Added added;
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
2: Modified modified;
// @lint-ignore-every FBTHRIFTCOMPAT FBTHRIFTCOMPAT1 FBTHRIFTCOMPAT2
3: Renamed renamed;
4: Replaced replaced;
5: Removed removed;
}

union SmallChangeNotification {
1: Renamed renamed;
2: Added added;
3: Deleted deleted;
4: Modified modified;
struct DirectoryRenamed {
1: eden.PathString from;
2: eden.PathString to;
}
struct CommitTransition {
1: eden.ThriftRootId from;
2: eden.ThriftRootId to;
}

union LargeChangeNotification {
Expand Down
3 changes: 2 additions & 1 deletion eden/fs/telemetry/EdenStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ struct JournalStats : StatsGroup<JournalStats> {
struct ThriftStats : StatsGroup<ThriftStats> {
Duration streamChangesSince{
"thrift.StreamingEdenService.streamChangesSince.streaming_time_us"};

Duration streamChangesSinceV2{
"thrift.StreamingEdenService.streamChangesSinceV2.streaming_time_us"};
Duration streamSelectedChangesSince{
"thrift.StreamingEdenService.streamSelectedChangesSince.streaming_time_us"};

Expand Down

0 comments on commit 8770b2d

Please sign in to comment.