From d9d2ee1040a8bb45af8c058ff5b210efbbd9fb4a Mon Sep 17 00:00:00 2001 From: Yash Agarwal <42363083+yashagarwal23@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:54:31 +0530 Subject: [PATCH] Read Logical Time Manager timestamp from the actor state (#356) * Add AnyCPU configuration in code.sln * Read the logical time manager value from the state provider Make StateProviderInitRetryDelayMilliseconds a const field Wait for Logical Time Manager to start before loading reminders Rename InitializeAndStartLogicalTimeManager * Remove extra ReleaseAssert * minor variable name change * Read LogicalTimeManager Timestamp from state for VolatileActorStateProvider also * KVS: Better exception handling; wait for read status * Volatile: Better exception handling; wait for read status * Add traces when waiting for WriteStatus to be Granted * resolve some comments --- .../Runtime/KvsActorStateProviderBase.cs | 194 ++++++++++++++++- .../ReliableCollectionsActorStateProvider.cs | 1 + .../Runtime/VolatileActorStateProvider.cs | 200 ++++++++++++++++-- .../Runtime/StatefulServiceReplicaAdapter.cs | 1 + 4 files changed, 374 insertions(+), 22 deletions(-) diff --git a/src/Microsoft.ServiceFabric.Actors/Runtime/KvsActorStateProviderBase.cs b/src/Microsoft.ServiceFabric.Actors/Runtime/KvsActorStateProviderBase.cs index 365c8b8d..0fed223a 100644 --- a/src/Microsoft.ServiceFabric.Actors/Runtime/KvsActorStateProviderBase.cs +++ b/src/Microsoft.ServiceFabric.Actors/Runtime/KvsActorStateProviderBase.cs @@ -27,6 +27,7 @@ namespace Microsoft.ServiceFabric.Actors.Runtime using CopyCompletionCallback = System.Action; using DataLossCallback = System.Func>; using FabricDirectory = Microsoft_ServiceFabric_Internal::System.Fabric.Common.FabricDirectory; + using ReleaseAssert = Microsoft_ServiceFabric_Internal::System.Fabric.Common.ReleaseAssert; using ReplicationCallback = System.Action>; using Requires = Microsoft_ServiceFabric_Internal::System.Fabric.Common.Requires; using RestoreCompletedCallback = System.Func; @@ -47,6 +48,7 @@ public abstract class KvsActorStateProviderBase private const string BackupRootFolderPrefix = "kvsasp_"; private const string KvsHealthSourceId = "KvsActorStateProvider"; private const string BackupCallbackSlowCancellationHealthProperty = "BackupCallbackSlowCancellation"; + private const int StateProviderInitRetryDelayMilliseconds = 500; private static readonly byte[] ActorPresenceValue = { byte.MinValue }; private readonly DataContractSerializer reminderSerializer; @@ -61,7 +63,6 @@ public abstract class KvsActorStateProviderBase /// Used to synchronize between backup callback invocation and replica close/abort /// private readonly SemaphoreSlim backupCallbackLock; - private ReplicaRole replicaRole; private IStatefulServicePartition partition; private string traceId; @@ -83,6 +84,9 @@ public abstract class KvsActorStateProviderBase private CancellationTokenSource backupCallbackCts; private Task backupCallbackTask; private bool isClosingOrAborting; + private bool isLogicalTimeManagerInitialized; + private CancellationTokenSource stateProviderInitCts; + private Task stateProviderInitTask; internal KvsActorStateProviderBase(ReplicatorSettings replicatorSettings) { @@ -105,6 +109,9 @@ internal KvsActorStateProviderBase(ReplicatorSettings replicatorSettings) this.backupCallbackCts = null; this.backupCallbackTask = null; this.isClosingOrAborting = false; + this.isLogicalTimeManagerInitialized = false; + this.stateProviderInitCts = null; + this.stateProviderInitTask = null; } /// @@ -246,13 +253,15 @@ Task IActorStateProvider.ActorActivatedAsync(ActorId actorId, CancellationToken /// /// A task that represents the asynchronous reminder callback completed notification processing. /// - Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) + async Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + var key = ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name); var data = new ReminderCompletedData(this.logicalTimeManager.CurrentLogicalTime, DateTime.UtcNow); var buffer = this.SerializeReminderCompletedData(data); - return this.actorStateProviderHelper.ExecuteWithRetriesAsync( + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( () => { cancellationToken.ThrowIfCancellationRequested(); @@ -445,6 +454,8 @@ Task> IActorStateProvider.GetActorsAsync( /// async Task>>> IActorStateProvider.GetRemindersAsync(int numItemsToReturn, ActorId actorId, ContinuationToken continuationToken, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + return await this.actorStateProviderHelper.ExecuteWithRetriesAsync( async () => { @@ -548,15 +559,17 @@ async Task>>> /// The token to monitor for cancellation requests. /// A task that represents the asynchronous save operation. /// The operation was canceled. - Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) + async Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + var reminderKey = CreateReminderStorageKey(actorId, reminder.Name); var data = new ActorReminderData(actorId, reminder, this.logicalTimeManager.CurrentLogicalTime); var buffer = this.SerializeReminder(data); var reminderCompletedKey = ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name); - return this.actorStateProviderHelper.ExecuteWithRetriesAsync( + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( () => { cancellationToken.ThrowIfCancellationRequested(); @@ -611,9 +624,11 @@ Task IActorStateProvider.DeleteRemindersAsync(IReadOnlyDictionary /// The operation was canceled. - Task IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken) + async Task IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken) { - return this.actorStateProviderHelper.ExecuteWithRetriesAsync( + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + + return await this.actorStateProviderHelper.ExecuteWithRetriesAsync( () => this.EnumerateReminderAsync(cancellationToken), "LoadRemindersAsync", cancellationToken); @@ -685,11 +700,12 @@ async Task IStateProviderReplica.ChangeRoleAsync(ReplicaRole newRole, Cancellati switch (newRole) { case ReplicaRole.Primary: - this.logicalTimeManager.Start(); + this.stateProviderInitCts = new CancellationTokenSource(); + this.stateProviderInitTask = this.StartStateProviderInitializationAsync(this.stateProviderInitCts.Token); break; default: - this.logicalTimeManager.Stop(); + await this.CancelStateProviderInitializationAsync(); break; } @@ -713,6 +729,7 @@ async Task IStateProviderReplica.CloseAsync(CancellationToken cancellationToken) // with actual ESE backup finishing with error. However, if ESE backup has finished successfully and // backup callback is in-flight, it does not wait for the backup callback to finish, . await this.CancelAndAwaitBackupCallbackIfAnyAsync(); + await this.CancelStateProviderInitializationAsync(); } /// @@ -728,6 +745,9 @@ void IStateProviderReplica.Abort() this.CancelAndAwaitBackupCallbackIfAnyAsync().ContinueWith( t => t.Exception, TaskContinuationOptions.OnlyOnFaulted); + this.CancelStateProviderInitializationAsync().ContinueWith( + t => t.Exception, + TaskContinuationOptions.OnlyOnFaulted); } /// @@ -972,6 +992,162 @@ private static object Deserialize(DataContractSerializer serializer, byte[] data } } + private async Task StartStateProviderInitializationAsync(CancellationToken cancellationToken) + { + Exception unexpectedException = null; + + try + { + cancellationToken.ThrowIfCancellationRequested(); + + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( + async () => + { + await this.InitializeAndStartLogicalTimeManagerAsync(cancellationToken); + }, + "StartStateProviderInitializationAsync", + cancellationToken); + } + catch (OperationCanceledException opEx) + { + if (!cancellationToken.IsCancellationRequested) + { + unexpectedException = opEx; + } + } + catch (FabricObjectClosedException) + { + // This can happen when replica is closing. CancellationToken should get signaled. + // Fall through and let the task check for CancellationToken. + } + catch (FabricNotPrimaryException) + { + // This replica is no more primary. CancellationToken should get signaled. + // Fall through and let the task check for CancellationToken. + } + catch (Exception ex) + { + unexpectedException = ex; + } + + if (unexpectedException != null) + { + var mssgFormat = "StartStateProviderInitializationAsync() failed due to " + + "an unexpected Exception causing replica to fault: {0}"; + + ActorTrace.Source.WriteErrorWithId( + TraceType, + this.traceId, + string.Format(mssgFormat, unexpectedException.ToString())); + + this.partition.ReportFault(FaultType.Transient); + } + } + + private async Task CancelStateProviderInitializationAsync() + { + if (this.stateProviderInitCts != null && + this.stateProviderInitCts.IsCancellationRequested == false) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Canceling state provider initialization..."); + + this.stateProviderInitCts.Cancel(); + + try + { + await this.stateProviderInitTask; + } + catch (Exception ex) + { + // Code should never come here. + ReleaseAssert.Failfast( + "CancelStateProviderInitializationAsync() unexpected exception: {0}.", + ex.ToString()); + } + finally + { + this.stateProviderInitCts = null; + this.stateProviderInitTask = null; + } + } + + this.StopLogicalTimeManager(); + } + + private async Task InitializeAndStartLogicalTimeManagerAsync(CancellationToken cancellationToken) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager..."); + + if (this.isLogicalTimeManagerInitialized == true) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical time manager already initialized..."); + return; + } + + // wait for read status + await this.WaitForReadStatusAsync(cancellationToken); + + using (var tx = this.storeReplica.CreateTransaction()) + { + var enumerator = this.storeReplica.Enumerate(tx, LogicalTimestampKey); + + while (enumerator.MoveNext()) + { + var item = enumerator.Current; + this.TryDeserializeAndApplyLogicalTimestamp(item.Metadata.Key, item.Value); + } + } + + this.logicalTimeManager.Start(); + Volatile.Write(ref this.isLogicalTimeManagerInitialized, true); + + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager SUCCEEDED."); + } + + private void StopLogicalTimeManager() + { + // Stop logical timer if it is running + if (this.isLogicalTimeManagerInitialized == true) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Stopping logical time manager..."); + + this.logicalTimeManager.Stop(); + this.isLogicalTimeManagerInitialized = false; + } + } + + private async Task WaitForReadStatusAsync(CancellationToken cancellationToken) + { + var retryCount = 0; + + while (!cancellationToken.IsCancellationRequested && + this.partition.ReadStatus != PartitionAccessStatus.Granted) + { + retryCount++; + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Read Status to be Granted"); + await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken); + } + } + + private async Task EnsureLogicalTimeManagerInitializedAsync(CancellationToken cancellationToken) + { + var retryCount = 0; + + while (this.replicaRole == ReplicaRole.Primary && !this.isLogicalTimeManagerInitialized) + { + retryCount++; + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for logical Time manager to be initialized"); + await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken); + } + + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical Time Manager is initialized"); + + if (this.replicaRole != ReplicaRole.Primary) + { + throw new FabricNotPrimaryException(FabricErrorCode.NotPrimary); + } + } + private void OnCopyComplete(KeyValueStoreEnumerator enumerator) { var inner = enumerator.Enumerate(LogicalTimestampKey); diff --git a/src/Microsoft.ServiceFabric.Actors/Runtime/ReliableCollectionsActorStateProvider.cs b/src/Microsoft.ServiceFabric.Actors/Runtime/ReliableCollectionsActorStateProvider.cs index fef80cda..a97cb4e4 100644 --- a/src/Microsoft.ServiceFabric.Actors/Runtime/ReliableCollectionsActorStateProvider.cs +++ b/src/Microsoft.ServiceFabric.Actors/Runtime/ReliableCollectionsActorStateProvider.cs @@ -945,6 +945,7 @@ private async Task WaitForWriteStatusAsync(CancellationToken cancellationToken) this.servicePartition.WriteStatus != PartitionAccessStatus.Granted) { retryCount++; + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Write Status to be Granted"); await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken); } } diff --git a/src/Microsoft.ServiceFabric.Actors/Runtime/VolatileActorStateProvider.cs b/src/Microsoft.ServiceFabric.Actors/Runtime/VolatileActorStateProvider.cs index 497b6b6b..e565425e 100644 --- a/src/Microsoft.ServiceFabric.Actors/Runtime/VolatileActorStateProvider.cs +++ b/src/Microsoft.ServiceFabric.Actors/Runtime/VolatileActorStateProvider.cs @@ -29,6 +29,7 @@ namespace Microsoft.ServiceFabric.Actors.Runtime VolatileActorStateProvider.ActorStateType, string, VolatileActorStateProvider.ActorStateData>; + using ReleaseAssert = Microsoft_ServiceFabric_Internal::System.Fabric.Common.ReleaseAssert; using Requires = Microsoft_ServiceFabric_Internal::System.Fabric.Common.Requires; using SR = Microsoft.ServiceFabric.Actors.SR; @@ -40,6 +41,7 @@ public class VolatileActorStateProvider : { private const string LogicalTimestampKey = "LogicalTimestamp"; private const string TraceType = "VolatileActorStateProvider"; + private const int StateProviderInitRetryDelayMilliseconds = 500; private static readonly ActorStateData ActorPresenceValue = new ActorStateData(new[] { byte.MinValue }); private readonly ActorStateTable stateTable; @@ -49,7 +51,6 @@ public class VolatileActorStateProvider : private readonly object replicationLock; private readonly ActorStateProviderHelper actorStateProviderHelper; private readonly ReplicatorSettings userDefinedReplicatorSettings; - private SecondaryPump secondaryPump; private ActorTypeInformation actorTypeInformation; private FabricReplicator fabricReplicator; @@ -61,6 +62,9 @@ public class VolatileActorStateProvider : private VolatileActorStateProviderSettings stateProviderSettings; private long roleChangeTracker; + private bool isLogicalTimeManagerInitialized; + private CancellationTokenSource stateProviderInitCts; + private Task stateProviderInitTask; /// /// Initializes a new instance of the class. @@ -89,6 +93,9 @@ public VolatileActorStateProvider(ReplicatorSettings replicatorSettings) this.replicaRole = ReplicaRole.Unknown; this.roleChangeTracker = DateTime.UtcNow.Ticks; this.actorStateProviderHelper = new ActorStateProviderHelper(this); + this.isLogicalTimeManagerInitialized = false; + this.stateProviderInitCts = null; + this.stateProviderInitTask = null; } internal enum ActorStateType @@ -203,13 +210,15 @@ await this.actorStateProviderHelper.ExecuteWithRetriesAsync( /// /// A task that represents the asynchronous reminder callback completed notification processing. /// - Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) + async Task IActorStateProvider.ReminderCallbackCompletedAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + var reminderCompletedKey = ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name); var reminderCompletedData = new ReminderCompletedData(this.logicalTimeManager.CurrentLogicalTime, DateTime.UtcNow); var actorStateData = new ActorStateData(reminderCompletedData); - return this.actorStateProviderHelper.ExecuteWithRetriesAsync( + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( () => { cancellationToken.ThrowIfCancellationRequested(); @@ -461,6 +470,8 @@ async Task>>> ContinuationToken continuationToken, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + return await this.actorStateProviderHelper.ExecuteWithRetriesAsync( async () => { @@ -555,8 +566,10 @@ async Task>>> /// The token to monitor for cancellation requests. /// A task that represents the asynchronous save operation. /// The operation was canceled. - Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) + async Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder reminder, CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + var actorStateDataWrapperList = new List { ActorStateDataWrapper.CreateForUpdate( @@ -569,7 +582,7 @@ Task IActorStateProvider.SaveReminderAsync(ActorId actorId, IActorReminder remin ActorStateProviderHelper.CreateReminderCompletedStorageKey(actorId, reminder.Name)), }; - return this.actorStateProviderHelper.ExecuteWithRetriesAsync( + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( () => { cancellationToken.ThrowIfCancellationRequested(); @@ -647,8 +660,10 @@ Task IActorStateProvider.DeleteRemindersAsync( /// parameter is a collection of all actor reminders contained in the actor state provider. /// /// The operation was canceled. - Task IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken) + async Task IActorStateProvider.LoadRemindersAsync(CancellationToken cancellationToken) { + await this.EnsureLogicalTimeManagerInitializedAsync(cancellationToken); + var reminderCollection = new ActorReminderCollection(); var stateDictionary = this.stateTable.GetActorStateDictionary(ActorStateType.Reminder); @@ -675,7 +690,7 @@ Task IActorStateProvider.LoadRemindersAsync(Cancellati new ActorReminderState(reminderData, this.logicalTimeManager.CurrentLogicalTime, reminderCompletedData)); } - return Task.FromResult((IActorReminderCollection)reminderCollection); + return (IActorReminderCollection)reminderCollection; } /// @@ -739,12 +754,12 @@ async Task IStateProviderReplica.ChangeRoleAsync(ReplicaRole newRole, Cancellati switch (newRole) { case ReplicaRole.IdleSecondary: - this.logicalTimeManager.Stop(); + await this.CancelStateProviderInitializationAsync(); this.secondaryPump.StartCopyAndReplicationPump(); break; case ReplicaRole.ActiveSecondary: - this.logicalTimeManager.Stop(); + await this.CancelStateProviderInitializationAsync(); ActorStateData data; if (this.stateTable.TryGetValue(ActorStateType.LogicalTimestamp, LogicalTimestampKey, out data) @@ -762,7 +777,8 @@ async Task IStateProviderReplica.ChangeRoleAsync(ReplicaRole newRole, Cancellati break; case ReplicaRole.Primary: - this.logicalTimeManager.Start(); + this.stateProviderInitCts = new CancellationTokenSource(); + this.stateProviderInitTask = this.StartStateProviderInitializationAsync(this.stateProviderInitCts.Token); // Wait for secondary pump to make sure there is no // outstanding task in-flight after processing NULL @@ -783,12 +799,14 @@ async Task IStateProviderReplica.ChangeRoleAsync(ReplicaRole newRole, Cancellati /// /// The token to monitor for cancellation requests. /// Task that represents the asynchronous close operation. - Task IStateProviderReplica.CloseAsync(CancellationToken cancellationToken) + async Task IStateProviderReplica.CloseAsync(CancellationToken cancellationToken) { + await this.CancelStateProviderInitializationAsync(); + // Wait for secondary pump to make sure there is no // outstanding task in-flight after processing NULL // operation from the replication queue. - return this.secondaryPump.WaitForPumpCompletionAsync(); + await this.secondaryPump.WaitForPumpCompletionAsync(); } /// @@ -800,7 +818,9 @@ Task IStateProviderReplica.CloseAsync(CancellationToken cancellationToken) /// void IStateProviderReplica.Abort() { - // no-op + this.CancelStateProviderInitializationAsync().ContinueWith( + t => t.Exception, + TaskContinuationOptions.OnlyOnFaulted); } /// @@ -1122,6 +1142,160 @@ private static string ExtractStateName(ActorId actorId, string storageKey) return storageKey.Substring(storageKeyPrefix.Length + 1); } + private async Task StartStateProviderInitializationAsync(CancellationToken cancellationToken) + { + Exception unexpectedException = null; + + try + { + cancellationToken.ThrowIfCancellationRequested(); + + await this.actorStateProviderHelper.ExecuteWithRetriesAsync( + async () => + { + await this.InitializeAndStartLogicalTimeManagerAsync(cancellationToken); + }, + "StartStateProviderInitializationAsync", + cancellationToken); + } + catch (OperationCanceledException opEx) + { + if (!cancellationToken.IsCancellationRequested) + { + unexpectedException = opEx; + } + } + catch (FabricObjectClosedException) + { + // This can happen when replica is closing. CancellationToken should get signaled. + // Fall through and let the task check for CancellationToken. + } + catch (FabricNotPrimaryException) + { + // This replica is no more primary. CancellationToken should get signaled. + // Fall through and let the task check for CancellationToken. + } + catch (Exception ex) + { + unexpectedException = ex; + } + + if (unexpectedException != null) + { + var mssgFormat = "StartStateProviderInitializationAsync() failed due to " + + "an unexpected Exception causing replica to fault: {0}"; + + ActorTrace.Source.WriteErrorWithId( + TraceType, + this.traceId, + string.Format(mssgFormat, unexpectedException.ToString())); + + this.partition.ReportFault(FaultType.Transient); + } + } + + private async Task CancelStateProviderInitializationAsync() + { + if (this.stateProviderInitCts != null && + this.stateProviderInitCts.IsCancellationRequested == false) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Canceling state provider initialization..."); + + this.stateProviderInitCts.Cancel(); + + try + { + await this.stateProviderInitTask; + } + catch (Exception ex) + { + // Code should never come here. + ReleaseAssert.Failfast( + "CancelStateProviderInitializationAsync() unexpected exception: {0}.", + ex.ToString()); + } + finally + { + this.stateProviderInitCts = null; + this.stateProviderInitTask = null; + } + } + + this.StopLogicalTimeManager(); + } + + private async Task InitializeAndStartLogicalTimeManagerAsync(CancellationToken cancellationToken) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager..."); + + if (this.isLogicalTimeManagerInitialized == true) + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical time manager already initialized..."); + return; + } + + // wait for read status + await this.WaitForReadStatusAsync(cancellationToken); + + ActorStateData data; + if (this.stateTable.TryGetValue(ActorStateType.LogicalTimestamp, LogicalTimestampKey, out data) + && data.LogicalTimestamp.HasValue) + { + this.logicalTimeManager.CurrentLogicalTime = data.LogicalTimestamp.Value; + } + + this.logicalTimeManager.Start(); + Volatile.Write(ref this.isLogicalTimeManagerInitialized, true); + + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Initializing logical time manager SUCCEEDED."); + } + + private void StopLogicalTimeManager() + { + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Stopping logical time manager..."); + + // Stop logical timer if it is running + if (this.isLogicalTimeManagerInitialized == true) + { + this.logicalTimeManager.Stop(); + this.isLogicalTimeManagerInitialized = false; + } + + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Stopped logical time manager..."); + } + + private async Task WaitForReadStatusAsync(CancellationToken cancellationToken) + { + var retryCount = 0; + + while (!cancellationToken.IsCancellationRequested && + this.partition.ReadStatus != PartitionAccessStatus.Granted) + { + retryCount++; + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Read Status to be Granted"); + await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken); + } + } + + private async Task EnsureLogicalTimeManagerInitializedAsync(CancellationToken cancellationToken) + { + var retryCount = 0; + + while (this.replicaRole == ReplicaRole.Primary && !this.isLogicalTimeManagerInitialized) + { + retryCount++; + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for logical Time manager to be initialized"); + await Task.Delay(retryCount * StateProviderInitRetryDelayMilliseconds, cancellationToken); + } + + ActorTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Logical Time Manager is initialized"); + + if (this.replicaRole != ReplicaRole.Primary) + { + throw new FabricNotPrimaryException(FabricErrorCode.NotPrimary); + } + } + private void LoadActorStateProviderSettings() { var configPackageName = ActorNameFormat.GetConfigPackageName(this.actorTypeInformation.ImplementationType); diff --git a/src/Microsoft.ServiceFabric.Services/Runtime/StatefulServiceReplicaAdapter.cs b/src/Microsoft.ServiceFabric.Services/Runtime/StatefulServiceReplicaAdapter.cs index c205ab8e..2ed9efc3 100644 --- a/src/Microsoft.ServiceFabric.Services/Runtime/StatefulServiceReplicaAdapter.cs +++ b/src/Microsoft.ServiceFabric.Services/Runtime/StatefulServiceReplicaAdapter.cs @@ -452,6 +452,7 @@ private async Task WaitForWriteStatusAsync(CancellationToken cancellationT while (true) { cancellationToken.ThrowIfCancellationRequested(); + ServiceTrace.Source.WriteInfoWithId(TraceType, this.traceId, "Waiting for Write Status to be Granted"); PartitionAccessStatus result; try