Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RNET-1137: Fix progress notification #3615

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
## 12.2.0 (2024-05-22)

### Enhancements
* Improved calculation of `SyncProgress.ProgressEstimate`. (Issue [#3580](https://github.com/realm/realm-dotnet/issues/3580]))
* Added support for `Migration.FindInNewRealm` which is a helper that allows you to lookup the object in the post-migration Realm that corresponds to an object from the pre-migration Realm. (Issue [#3600](https://github.com/realm/realm-dotnet/issues/3600))
* Added `[System.Reflection.Obfuscation]` on the generated `RealmSchema` field to improve compatibility with obfuscation tools that change field and property names of generated classes. (Issue [#3574](https://github.com/realm/realm-dotnet/issues/3574))
* Added support for list and dictionaries of `RealmValue` (`IList<RealmValue>` and `IDictionary<string, RealmValue>`) to be contained in a `RealmValue`. Lists and dictionaries can contain an arbitrary number of collections themselves. It is possible to convert an existing collection to a `RealmValue` using the new static methods `RealmValue.List` and `RealmValue.Dictionary` or using the implicit operators if converting from common types like `List`, `RealmValue[]` or `Dictionary`. Finally, it is possible to obtain the contained collections by using the new conversion method `AsList` and `AsDictionary`. For example:
Expand Down
9 changes: 3 additions & 6 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public delegate void SessionErrorCallback(IntPtr session_handle_ptr,
IntPtr managed_sync_config_handle);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes, double progressEstimate);
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, double progressEstimate);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionWaitCallback(IntPtr task_completion_source, int error_code, PrimitiveValue message);
Expand Down Expand Up @@ -405,13 +405,10 @@ private static IntPtr NotifyAfterClientReset(IntPtr beforeFrozen, IntPtr after,
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionProgressCallback))]
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes, double progressEstimate)
private static void HandleSessionProgress(IntPtr tokenPtr, double progressEstimate)
{
var token = (ProgressNotificationToken?)GCHandle.FromIntPtr(tokenPtr).Target;

// This is used to provide a reasonable progress estimate until the core work is done
double managedProgressEstimate = transferableBytes > 0.0 ? transferredBytes / transferableBytes : 1.0;
token?.Notify(managedProgressEstimate);
token?.Notify(progressEstimate);
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionWaitCallback))]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 35 additions & 33 deletions Tests/Realm.Tests/Sync/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -756,22 +756,45 @@ public void Session_OnSessionError()
});
}

// This test needs to be revisited when the work on progress notification is finished.
[Test]
public void SessionIntegrationTest_ProgressObservable(
[ValueSource(nameof(AppTypes))] string appType,
[ValueSource(nameof(ProgressModeTypes))] ProgressMode mode)
{
const int objectSize = 1_000_000;
const int objectsToRecord = 2;
var partitionString = Guid.NewGuid().ToString();

SyncTestHelpers.RunBaasTestAsync(async () =>
{
Realm realm;
var uploadRealm = await GetTestRealm(appType, partitionString);
var uploadSession = GetSession(uploadRealm);
var uploadObservable = uploadSession.GetProgressObservable(ProgressDirection.Upload, mode);

for (var i = 0; i < objectsToRecord; i++)
{
uploadRealm.Write(() =>
{
uploadRealm.Add(new HugeSyncObject(objectSize));
});
}

await TestObservable(uploadObservable);

var downloadRealm = await GetTestRealm(appType, partitionString);
var downloadSession = GetSession(downloadRealm);
var downloadObservable = downloadSession.GetProgressObservable(ProgressDirection.Download, mode);

await TestObservable(downloadObservable);
}, timeout: 120_000);
return;

async Task<Realm> GetTestRealm(string type, string partition)
{
if (appType == AppConfigType.Default)
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
realm = GetRealm(config);
var config = await GetIntegrationConfigAsync(partition);
return GetRealm(config);
}
else
{
Expand All @@ -780,37 +803,23 @@ public void SessionIntegrationTest_ProgressObservable(
{
r.Subscriptions.Add(r.All<HugeSyncObject>());
};
realm = await GetRealmAsync(config);
return GetRealm(config);
}
}

async Task TestObservable(IObservable<SyncProgress> observable)
{
var completionTcs = new TaskCompletionSource();
var callbacksInvoked = 0;

var session = GetSession(realm);

var observable = session.GetProgressObservable(ProgressDirection.Upload, mode);

for (var i = 0; i < objectsToRecord; i++)
{
realm.Write(() =>
{
realm.Add(new HugeSyncObject(objectSize));
});
}

var lastReportedProgress = 0.0d;

var progressList = new List<SyncProgress>();
var callbacksInvoked = 0;

using var token = observable.Subscribe(p =>
{
try
{
callbacksInvoked++;

progressList.Add(p);

if (p.ProgressEstimate < 0.0 || p.ProgressEstimate > 1.0)
if (p.ProgressEstimate is < 0.0 or > 1.0)
{
throw new Exception($"Expected progress estimate to be between 0.0 and 1.0, but was {p.ProgressEstimate}");
}
Expand All @@ -828,25 +837,18 @@ public void SessionIntegrationTest_ProgressObservable(
}

completionTcs.TrySetResult();
lastReportedProgress = p.ProgressEstimate;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this happening after we complete the tcs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't, it should happen all the time, I'll fix it 🤦

}

lastReportedProgress = p.ProgressEstimate;
}
catch (Exception e)
{
completionTcs.TrySetException(e);
}
});

realm.Write(() =>
{
realm.Add(new HugeSyncObject(objectSize));
});

await completionTcs.Task;

Assert.That(callbacksInvoked, Is.GreaterThanOrEqualTo(1));
}, timeout: 120_000);
}
}

[Test]
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/async_open_task_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ REALM_EXPORT uint64_t realm_asyncopentask_register_progress_notifier(const Share
{
return handle_errors(ex, [&] {
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
s_progress_callback(managed_state, progress_estimate);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedS
: SyncSession::ProgressDirection::download;

return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
s_progress_callback(managed_state, progress_estimate);
}, notifier_direction, is_streaming);
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace realm::binding {

using SharedSyncSession = std::shared_ptr<SyncSession>;
using SessionErrorCallbackT = void(SharedSyncSession* session, realm_sync_error error, void* managed_sync_config);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes, double progress_estimate);
using ProgressCallbackT = void(void* state, double progress_estimate);
using NotifyBeforeClientResetCallbackT = void*(SharedRealm& before_frozen, void* managed_sync_config);
using NotifyAfterClientResetCallbackT = void*(SharedRealm& before_frozen, SharedRealm& after, void* managed_sync_config, bool did_recover);

Expand Down
Loading