Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RNET-1083: Add support for progress estimate on progress notifications #3479

Merged
merged 27 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Breaking Changes
* Added automatic serialization and deserialization of Realm classes when using methods on `MongoClient.Collection`, without the need to annotate classes with `MongoDB.Bson`attributes. This feature required to change the default serialization for various types (including `DateTimeOffset`). If you prefer to use the previous serialization, you need to call `Realm.SetLegacySerialization` before any kind of serialization is done, otherwise it may not work as epxected. [#3459](https://github.com/realm/realm-dotnet/pull/3459)
* `SyncProgress.TransferredBytes` and `SyncProgress.TransferableBytes` have been removed in favour of `SyncProgress.ProgressEstimate`, a float value between 0.0 and 1.0 that expresses the percentage estimate of the current progress. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))

### Enhancements
* Added the `MongoClient.GetCollection<T>` method to get a collection of documents from MongoDB that can be deserialized in Realm objects. This methods works the same as `MongoClient.GetDatabase(dbName).GetCollection(collectionName)`, but the database name and collection name are automatically derived from the Realm object class. [#3414](https://github.com/realm/realm-dotnet/pull/3414)
Expand All @@ -21,6 +22,7 @@
* Automatic client reset recovery now does a better job of recovering changes when changesets were downloaded from the server after the unuploaded local changes were committed. If the local Realm happened to be fully up to date with the server prior to the client reset, automatic recovery should now always produce exactly the same state as if no client reset was involved. (Core 13.24.1)
* Exceptions thrown during bootstrap application will now be surfaced to the user rather than terminating the program with an unhandled exception. (Core 13.25.0)
* Allow the using `>`, `>=`, `<`, `<=` operators in `Realm.Filter()` queries for string constants. This is a case sensitive lexicographical comparison. Improved performance of RQL (`.Filter()`) queries on a non-linked string property using: >, >=, <, <=, operators and fixed behaviour that a null string should be evaluated as less than everything, previously nulls were not matched. (Core 13.26.0-14-gdf25f)
* `Session.GetProgressObservable` can now be used with Flexible Sync. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))

### Fixed
* Automatic client reset recovery would duplicate insertions in a list when recovering a write which made an unrecoverable change to a list (i.e. modifying or deleting a pre-existing entry), followed by a subscription change, followed by a write which added an entry to the list. (Core 13.24.0)
Expand Down
6 changes: 3 additions & 3 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public delegate void SessionErrorCallback(IntPtr session_handle_ptr,
IntPtr managed_sync_config_handle);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes);
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, double progressEstimate);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionWaitCallback(IntPtr task_completion_source, int error_code, PrimitiveValue message);
Expand Down Expand Up @@ -405,10 +405,10 @@ private static IntPtr NotifyAfterClientReset(IntPtr beforeFrozen, IntPtr after,
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionProgressCallback))]
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes)
private static void HandleSessionProgress(IntPtr tokenPtr, double progressEstimate)
{
var token = (ProgressNotificationToken?)GCHandle.FromIntPtr(tokenPtr).Target;
token?.Notify(transferredBytes, transferableBytes);
token?.Notify(progressEstimate);
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionWaitCallback))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public ProgressNotificationToken(Action<SyncProgress> observer, Func<GCHandle, u
}
}

public void Notify(ulong transferredBytes, ulong transferableBytes)
public void Notify(double progressEstimate)
{
Task.Run(() =>
{
try
{
_observer(new SyncProgress(transferredBytes, transferableBytes));
_observer(new SyncProgress(progressEstimate));
}
catch (Exception ex)
{
Expand Down
26 changes: 9 additions & 17 deletions Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,26 @@
//
////////////////////////////////////////////////////////////////////////////

using System;

namespace Realms.Sync
{
/// <summary>
/// A struct containing information about the progress state at a given instant.
/// </summary>
public struct SyncProgress
public readonly struct SyncProgress
{
/// <summary>
/// Gets the number of bytes that have been transferred since subscribing for progress notifications.
/// </summary>
/// <value>The number of transferred bytes.</value>
public ulong TransferredBytes { get; }

/// <summary>
/// Gets the total number of bytes that have to be transferred since subscribing for progress notifications.
/// The difference between that number and <see cref="TransferredBytes"/> gives you the number of bytes not yet
/// transferred. If the difference is 0, then all changes at the instant the callback fires have been
/// successfully transferred.
/// Gets the percentage estimate of the current progress, expressed as a float between 0.0 and 1.0.
/// </summary>
/// <value>The number of transferable bytes.</value>
public ulong TransferableBytes { get; }
/// <value>A percentage estimate of the progress.</value>
public double ProgressEstimate { get; }

internal SyncProgress(ulong transferred, ulong transferable)
internal SyncProgress(double progressEstimate)
{
TransferredBytes = transferred;
TransferableBytes = transferable;
ProgressEstimate = progressEstimate;
}

internal bool IsComplete => TransferableBytes == TransferredBytes;
internal readonly bool IsComplete => ProgressEstimate >= 1.0;
papafe marked this conversation as resolved.
Show resolved Hide resolved
}
}
3 changes: 1 addition & 2 deletions Realm/Realm/Sync/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ public event PropertyChangedEventHandler? PropertyChanged
/// var observable = session.GetProgressObservable(ProgressDirection.Upload, ProgressMode.ReportIndefinitely);
/// notificationToken = observable.Subscribe(progress =>
/// {
/// // Update relevant properties by accessing
/// // progress.TransferredBytes and progress.TransferableBytes
/// // Update relevant properties by accessing progress.ProgressEstimate
/// });
/// }
///
Expand Down
86 changes: 45 additions & 41 deletions Tests/Realm.Tests/Sync/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,25 @@ namespace Realms.Tests.Sync
[TestFixture, Preserve(AllMembers = true)]
public class SessionTests : SyncTestBase
{
public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

public static readonly object[] AllClientResetHandlers = new object[]
{
typeof(DiscardUnsyncedChangesHandler),
typeof(RecoverUnsyncedChangesHandler),
typeof(RecoverOrDiscardUnsyncedChangesHandler),
};

public static readonly ProgressMode[] ProgressModeTypes = new ProgressMode[]
{
ProgressMode.ForCurrentlyOutstandingWork,
ProgressMode.ReportIndefinitely,
};

[Preserve]
static SessionTests()
{
Expand All @@ -79,12 +91,6 @@ static SessionTests()
};
}

public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

[Test]
public void Realm_SyncSession_WhenSyncedRealm()
{
Expand Down Expand Up @@ -750,18 +756,33 @@ public void Session_OnSessionError()
});
}

[TestCase(ProgressMode.ForCurrentlyOutstandingWork)]
[TestCase(ProgressMode.ReportIndefinitely)]
public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
#pragma warning disable CS0618 // Type or member is obsolete
papafe marked this conversation as resolved.
Show resolved Hide resolved
[Test]
public void SessionIntegrationTest_ProgressObservable(
papafe marked this conversation as resolved.
Show resolved Hide resolved
[ValueSource(nameof(AppTypes))] string appType,
[ValueSource(nameof(ProgressModeTypes))] ProgressMode mode)
{
const int objectSize = 1_000_000;
const int objectsToRecord = 2;
SyncTestHelpers.RunBaasTestAsync(async () =>
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
using var realm = GetRealm(config);
Realm realm;
if (appType == AppConfigType.Default)
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
realm = GetRealm(config);
}
else
{
var config = await GetFLXIntegrationConfigAsync();
config.PopulateInitialSubscriptions = (r) =>
{
r.Subscriptions.Add(r.All<HugeSyncObject>());
};
realm = await GetRealmAsync(config);
}

var completionTcs = new TaskCompletionSource<ulong>();
var completionTcs = new TaskCompletionSource();
var callbacksInvoked = 0;

var session = GetSession(realm);
Expand All @@ -782,29 +803,24 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
{
callbacksInvoked++;

if (p.TransferredBytes > p.TransferableBytes)
if (p.ProgressEstimate < 0.0 || p.ProgressEstimate > 1.0)
{
// TODO https://github.com/realm/realm-dotnet/issues/2360: this seems to be a regression in Sync.
// throw new Exception($"Expected: {p.TransferredBytes} <= {p.TransferableBytes}");
}

if (mode == ProgressMode.ForCurrentlyOutstandingWork)
{
if (p.TransferableBytes <= objectSize ||
p.TransferableBytes >= (objectsToRecord + 2) * objectSize)
{
throw new Exception($"Expected: {p.TransferableBytes} to be in the ({objectSize}, {(objectsToRecord + 1) * objectSize}) range.");
}
throw new Exception($"Expected progress estimate to be between 0.0 and 1.0, but was {p.ProgressEstimate}");
}
}
catch (Exception e)
{
completionTcs.TrySetException(e);
}

if (p.TransferredBytes >= p.TransferableBytes)
if (p.IsComplete)
{
completionTcs.TrySetResult(p.TransferredBytes);
if (p.ProgressEstimate != 1.0)
{
throw new Exception($"Expected progress estimate to be complete if and only if ProgressEstimate == 1.0");
papafe marked this conversation as resolved.
Show resolved Hide resolved
}

completionTcs.TrySetResult();
}
});

Expand All @@ -813,24 +829,12 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
realm.Add(new HugeSyncObject(objectSize));
});

var totalTransferred = await completionTcs.Task;

if (mode == ProgressMode.ForCurrentlyOutstandingWork)
{
Assert.That(totalTransferred, Is.GreaterThanOrEqualTo(objectSize));

// We add ObjectsToRecord + 1 items, but the last item is added after subscribing
// so in the fixed mode, we should not get updates for it.
Assert.That(totalTransferred, Is.LessThan((objectsToRecord + 5) * objectSize));
}
else
{
Assert.That(totalTransferred, Is.GreaterThanOrEqualTo((objectsToRecord + 1) * objectSize));
}
await completionTcs.Task;

Assert.That(callbacksInvoked, Is.GreaterThan(1));
Assert.That(callbacksInvoked, Is.GreaterThanOrEqualTo(1));
}, timeout: 120_000);
}
#pragma warning restore CS0618 // Type or member is obsolete

[Test]
public void Session_Stop_StopsSession()
Expand Down
12 changes: 2 additions & 10 deletions Tests/Realm.Tests/Sync/SynchronizedInstanceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ public void GetInstanceAsync_ReportsProgress()
using var realm = await GetRealmAsync(config);
Assert.That(realm.All<HugeSyncObject>().Count(), Is.EqualTo(NumberOfObjects));
Assert.That(callbacksInvoked, Is.GreaterThan(0));

// We can't validate exact values because there's a reasonable chance that
// the last notification won't be invoked if the Realm is downloaded first.
Assert.That(lastProgress.TransferredBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.TransferableBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.ProgressEstimate, Is.GreaterThan(0.0));
}, 60000);
}

Expand Down Expand Up @@ -163,11 +159,7 @@ public void GetInstanceAsync_WithOnProgress_DoesntThrowWhenOnProgressIsSetToNull

Assert.That(realm.All<HugeSyncObject>().Count(), Is.EqualTo(NumberOfObjects));
Assert.That(callbacksInvoked, Is.GreaterThan(0));

// We can't validate exact values because there's a reasonable chance that
// the last notification won't be invoked if the Realm is downloaded first.
Assert.That(lastProgress.TransferredBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.TransferableBytes, Is.GreaterThan(OneMegabyte));
Assert.That(lastProgress.ProgressEstimate, Is.GreaterThan(0.0));
}, 60000);
}

Expand Down
4 changes: 2 additions & 2 deletions wrappers/src/async_open_task_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ REALM_EXPORT void realm_asyncopentask_cancel(SharedAsyncOpenTask& task, NativeEx
REALM_EXPORT uint64_t realm_asyncopentask_register_progress_notifier(const SharedAsyncOpenTask& task, void* managed_state, NativeException::Marshallable& ex)
{
return handle_errors(ex, [&] {
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable) {
s_progress_callback(managed_state, transferred, transferable);
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, progress_estimate);
});
});
}
Expand Down
4 changes: 2 additions & 2 deletions wrappers/src/sync_session_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedS
? SyncSession::ProgressDirection::upload
: SyncSession::ProgressDirection::download;

return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable) {
s_progress_callback(managed_state, transferred, transferable);
return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, progress_estimate);
}, notifier_direction, is_streaming);
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace realm::binding {

using SharedSyncSession = std::shared_ptr<SyncSession>;
using SessionErrorCallbackT = void(SharedSyncSession* session, realm_sync_error error, void* managed_sync_config);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes);
using ProgressCallbackT = void(void* state, double progress_estimate);
using NotifyBeforeClientResetCallbackT = void*(SharedRealm& before_frozen, void* managed_sync_config);
using NotifyAfterClientResetCallbackT = void*(SharedRealm& before_frozen, SharedRealm& after, void* managed_sync_config, bool did_recover);

Expand Down
Loading