Skip to content

Commit

Permalink
+cluster change joined -> up awaitable func, and minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Nov 7, 2022
1 parent 194de5d commit 32ad34d
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 7 deletions.
3 changes: 3 additions & 0 deletions Sources/DistributedCluster/Cluster/Cluster+Membership.swift
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ extension Cluster {
case notFoundAny(Cluster.Endpoint, in: Cluster.Membership)
case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member)
case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member)
case countRequirementNotMet(expected: Int, expectedStatus: Cluster.MemberStatus)
case awaitStatusTimedOut(Duration, Error?)

var prettyDescription: String {
Expand All @@ -721,6 +722,8 @@ extension Cluster {
return "Expected \(reflecting: foundMember.node) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]"
case .statusRequirementNotMet(let expectedStatus, let foundMember):
return "Expected \(reflecting: foundMember.node) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]"
case .countRequirementNotMet(let count, let expectedStatus):
return "Expected \(count) nodes to be seen as [\(expectedStatus)], but did not find enough"
case .awaitStatusTimedOut(let duration, let lastError):
let lastErrorMessage: String
if let error = lastError {
Expand Down
29 changes: 29 additions & 0 deletions Sources/DistributedCluster/Cluster/ClusterControl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,23 @@ public struct ClusterControl {
///
/// - Returns `Cluster.Member` for the joined node.
@discardableResult
@available(*, deprecated, renamed: "up(within:)")
public func joined(within: Duration) async throws -> Cluster.Member {
try await self.waitFor(self.node, .up, within: within)
}

/// Wait, within the given duration, until this actor system has joined the cluster and become ``Cluster/MemberStatus/up``.
///
/// - Parameters
/// - node: The node to be joined by this system.
/// - within: Duration to wait for.
///
/// - Returns `Cluster.Member` for the joined node.
@discardableResult
public func up(within: Duration) async throws -> Cluster.Member {
try await self.waitFor(self.node, .up, within: within)
}

/// Wait, within the given duration, until the passed in node has joined the cluster and become ``Cluster/MemberStatus/up``.
///
/// - Parameters
Expand Down Expand Up @@ -208,6 +221,22 @@ public struct ClusterControl {
}
}

/// Wait, within the given duration, for the cluster to have at least `nodes` members of the specified status.
///
/// - Parameters
/// - nodes: The _least_ (inclusive) number of nodes (including this node) to be part of the cluster membership
/// - status: The expected member status.
/// - within: Duration to wait for.
public func waitFor(countAtLeast: Int, _ status: Cluster.MemberStatus, within: Duration) async throws {
try await self.waitForMembershipEventually(within: within) { membership in
if membership.count(withStatus: status) >= countAtLeast {
return membership
} else {
throw Cluster.MembershipError(.countRequirementNotMet(expected: countAtLeast, expectedStatus: status))
}
}
}

/// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status.
///
/// - Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,9 @@ extension OpLogDistributedReceptionist {

for registration in registrations {
if subscription.tryOffer(registration: registration) {
self.log.notice("OFFERED \(registration.actorID) TO \(subscription)")
self.log.trace("Offered \(registration.actorID) to \(subscription)")
} else {
self.log.notice("DROPPED \(registration.actorID) TO \(subscription)")
self.log.trace("Dropped \(registration.actorID) to \(subscription)")
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions Sources/DistributedCluster/ClusterSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ extension ClusterSystem {
let baggage = Baggage.current ?? .topLevel
// TODO: we can enrich this with actor and system information here if not already present.

return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { span in
return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in
let reply: RemoteCallReply<Res> = try await self.withCallID(on: actor.id, target: target) { callID in
var invocation = InvocationMessage(
callID: callID,
Expand Down Expand Up @@ -1226,7 +1226,7 @@ extension ClusterSystem {
let baggage = Baggage.current ?? .topLevel
// TODO: we can enrich this with actor and system information here if not already present.

return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { span in
return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in
let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in
var invocation = InvocationMessage(
callID: callID,
Expand Down Expand Up @@ -1255,8 +1255,10 @@ extension ClusterSystem {
) async throws -> Reply
where Reply: AnyRemoteCallReply
{
// Make an UUID for the remote call (so we can accept a reply for it)
let callID = UUID()

// Prepare timeout handling
let timeout = RemoteCall.timeout ?? self.settings.remoteCall.defaultTimeout
let timeoutTask: Task<Void, Error> = Task.detached {
try await Task.sleep(nanoseconds: UInt64(timeout.nanoseconds))
Expand Down Expand Up @@ -1295,6 +1297,7 @@ extension ClusterSystem {
timeoutTask.cancel()
}

/// Call the body which should perform the actual call!
let reply: any AnyRemoteCallReply = try await withCheckedThrowingContinuation { continuation in
self.inFlightCallLock.withLock {
self._inFlightCalls[callID] = continuation // this is to be resumed from an incoming reply or timeout
Expand Down Expand Up @@ -1445,7 +1448,7 @@ extension ClusterSystem {
throw DeadLetterError(recipient: recipient)
}

try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .server) { span in
try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .server) { _ in
try await executeDistributedTarget(
on: actor,
target: target,
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/DeadLetters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public final class DeadLetterOffice {
}

// in all other cases, we want to log the dead letter:
self.log.notice(
self.log.debug(
"Dead letter was not delivered \(recipientString)",
metadata: { () -> Logger.Metadata in
// TODO: more metadata (from Envelope) (e.g. sender)
Expand Down
2 changes: 1 addition & 1 deletion Sources/DistributedCluster/InvocationBehavior.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
//===----------------------------------------------------------------------===//

import Distributed
import InstrumentationBaggage
import struct Foundation.Data
import InstrumentationBaggage

/// Representation of the distributed invocation in the Behavior APIs.
/// This needs to be removed eventually as we remove behaviors.
Expand Down

0 comments on commit 32ad34d

Please sign in to comment.