diff --git a/Package.swift b/Package.swift index 6b29667ec..a4def51a9 100644 --- a/Package.swift +++ b/Package.swift @@ -39,11 +39,13 @@ var targets: [PackageDescription.Target] = [ .product(name: "NIOSSL", package: "swift-nio-ssl"), .product(name: "NIOExtras", package: "swift-nio-extras"), .product(name: "SwiftProtobuf", package: "swift-protobuf"), - .product(name: "Logging", package: "swift-log"), - .product(name: "Metrics", package: "swift-metrics"), .product(name: "ServiceDiscovery", package: "swift-service-discovery"), .product(name: "Backtrace", package: "swift-backtrace"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + // Observability + .product(name: "Logging", package: "swift-log"), + .product(name: "Metrics", package: "swift-metrics"), + .product(name: "Tracing", package: "swift-distributed-tracing"), ] ), @@ -182,7 +184,8 @@ var dependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-collections", from: "1.0.1"), // ~~~ Observability ~~~ - .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log", from: "1.4.0"), + .package(url: "https://github.com/apple/swift-distributed-tracing", from: "0.3.0"), // swift-metrics 1.x and 2.x are almost API compatible, so most clients should use .package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"), .package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"), diff --git a/Samples/Package.swift b/Samples/Package.swift index 3b8decbc1..1179893bf 100644 --- a/Samples/Package.swift +++ b/Samples/Package.swift @@ -21,6 +21,7 @@ var targets: [PackageDescription.Target] = [ name: "SampleDiningPhilosophers", dependencies: [ .product(name: "DistributedCluster", package: "swift-distributed-actors"), + "_PrettyLogHandler", ], path: "Sources/SampleDiningPhilosophers", exclude: [ @@ -29,6 +30,24 @@ var targets: [PackageDescription.Target] = [ ] ), + .executableTarget( + name: "SampleClusterTracing", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + .product(name: "OpenTelemetry", package: "opentelemetry-swift"), + .product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"), + "_PrettyLogHandler", + ], + path: "Sources/SampleClusterTracing" + ), + + .target( + name: "_PrettyLogHandler", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + ] + ), + /* --- tests --- */ // no-tests placeholder project to not have `swift test` fail on Samples/ @@ -45,6 +64,7 @@ var dependencies: [Package.Dependency] = [ .package(name: "swift-distributed-actors", path: "../"), // ~~~~~~~ only for samples ~~~~~~~ + .package(url: "https://github.com/slashmo/opentelemetry-swift", from: "0.3.0"), ] let package = Package( @@ -58,11 +78,14 @@ let package = Package( ], products: [ /* --- samples --- */ - .executable( name: "SampleDiningPhilosophers", targets: ["SampleDiningPhilosophers"] ), + .executable( + name: "SampleClusterTracing", + targets: ["SampleClusterTracing"] + ), ], dependencies: dependencies, diff --git a/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift b/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift new file mode 100644 index 000000000..d7fee503b --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +// Sleep, with adding a little bit of noise (additional delay) to the duration. +func noisySleep(for duration: ContinuousClock.Duration) async { + var duration = duration + .milliseconds(Int.random(in: 100 ..< 300)) + try? await Task.sleep(until: ContinuousClock.now + duration, clock: .continuous) +} diff --git a/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift b/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift new file mode 100644 index 000000000..d26b72d72 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift @@ -0,0 +1,44 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +protocol Chopping { + func chop(_ vegetable: Vegetable) async throws -> Vegetable +} + +distributed actor VegetableChopper: Chopping { + @ActorID.Metadata(\.receptionID) + var receptionID: String + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + + self.receptionID = "*" // default key for "all of this type" + await actorSystem.receptionist.checkIn(self) + } + + distributed func chop(_ vegetable: Vegetable) async throws -> Vegetable { + await InstrumentationSystem.tracer.withSpan(#function) { _ in + await noisySleep(for: .seconds(5)) + + return vegetable.asChopped + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift b/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift new file mode 100644 index 000000000..42d696b2f --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift @@ -0,0 +1,143 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +distributed actor PrimaryCook: LifecycleWatch { + lazy var log = Logger(actor: self) + + var choppers: [ClusterSystem.ActorID: VegetableChopper] = [:] + var waitingForChoppers: (Int, CheckedContinuation)? + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + + _ = self.startChopperListingTask() + } + + func startChopperListingTask() -> Task { + Task { + for await chopper in await actorSystem.receptionist.listing(of: VegetableChopper.self) { + log.notice("Discovered vegetable chopper: \(chopper.id)") + self.choppers[chopper.id] = chopper + + /// We implement a simple "if we're waiting for N choppers... let's notify the continuation once that is reached" + /// This would be nice to provide as a fun "active" collection type that can be `.waitFor(...)`-ed. + if let waitingForChoppersCount = self.waitingForChoppers?.0, + choppers.count >= waitingForChoppersCount + { + self.waitingForChoppers?.1.resume() + } + } + } + } + + distributed func makeDinner() async throws -> Meal { + try await InstrumentationSystem.tracer.withSpan(#function) { _ in + await noisySleep(for: .milliseconds(200)) + + log.notice("Cooking dinner, but we need [2] vegetable choppers...! Suspend waiting for nodes to join.") + let (first, second) = try await getChoppers() + async let veggies = try chopVegetables(firstChopper: first, secondChopper: second) + async let meat = marinateMeat() + async let oven = preheatOven(temperature: 350) + // ... + return try await cook(veggies, meat, oven) + } + } + + private func getChoppers() async throws -> (some Chopping, some Chopping) { + await withCheckedContinuation { cc in + self.waitingForChoppers = (2, cc) + } + + var chopperIDs = self.choppers.keys.makeIterator() + guard let id1 = chopperIDs.next(), + let first = choppers[id1] + else { + throw NotEnoughChoppersError() + } + guard let id2 = chopperIDs.next(), + let second = choppers[id2] + else { + throw NotEnoughChoppersError() + } + + return (first, second) + } + + // Called by lifecycle watch when a watched actor terminates. + func terminated(actor id: DistributedCluster.ActorID) async { + self.choppers.removeValue(forKey: id) + } +} + +func chopVegetables(firstChopper: some Chopping, + secondChopper: some Chopping) async throws -> [Vegetable] +{ + try await InstrumentationSystem.tracer.withSpan("chopVegetables") { _ in + // Chop the vegetables...! + // + // However, since chopping is a very difficult operation, + // one chopping task can be performed at the same time on a single service! + // (Imagine that... we cannot parallelize these two tasks, and need to involve another service). + async let carrot = try firstChopper.chop(.carrot(chopped: false)) + async let potato = try secondChopper.chop(.potato(chopped: false)) + return try await [carrot, potato] + } +} + +// func chop(_ vegetable: Vegetable, tracer: any Tracer) async throws -> Vegetable { +// await tracer.withSpan("chop-\(vegetable)") { _ in +// await sleep(for: .seconds(5)) +// // ... +// return vegetable // "chopped" +// } +// } + +func marinateMeat() async -> Meat { + await noisySleep(for: .milliseconds(620)) + + return await InstrumentationSystem.tracer.withSpan("marinateMeat") { _ in + await noisySleep(for: .seconds(3)) + // ... + return Meat() + } +} + +func preheatOven(temperature: Int) async -> Oven { + await InstrumentationSystem.tracer.withSpan("preheatOven") { _ in + // ... + await noisySleep(for: .seconds(6)) + return Oven() + } +} + +func cook(_: Any, _: Any, _: Any) async -> Meal { + await InstrumentationSystem.tracer.withSpan("cook") { span in + span.addEvent("children-asking-if-done-already") + await noisySleep(for: .seconds(3)) + span.addEvent("children-asking-if-done-already-again") + await noisySleep(for: .seconds(2)) + // ... + return Meal() + } +} + +struct NotEnoughChoppersError: Error {} diff --git a/Samples/Sources/SampleClusterTracing/Model.swift b/Samples/Sources/SampleClusterTracing/Model.swift new file mode 100644 index 000000000..366eb3175 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Model.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +struct Meal: Sendable, Codable {} + +struct Meat: Sendable, Codable {} + +struct Oven: Sendable, Codable {} + +enum Vegetable: Sendable, Codable { + case potato(chopped: Bool) + case carrot(chopped: Bool) + + var asChopped: Self { + switch self { + case .carrot: return .carrot(chopped: true) + case .potato: return .potato(chopped: true) + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift b/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift new file mode 100644 index 000000000..7a3438b86 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift @@ -0,0 +1,54 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +struct ChoppingNode { + let system: ClusterSystem + + var chopper: VegetableChopper? + + init(name: String, port: Int) async { + self.system = await ClusterSystem(name) { settings in + settings.bindPort = port + + // We are purposefully making very slow calls, so they show up nicely in tracing: + settings.remoteCall.defaultTimeout = .seconds(20) + } + } + + mutating func run() async throws { + monitorMembership(on: self.system) + + let leaderEndpoint = Cluster.Endpoint(host: self.system.cluster.endpoint.host, port: 7330) + self.system.log.notice("Joining: \(leaderEndpoint)") + self.system.cluster.join(endpoint: leaderEndpoint) + + try await self.system.cluster.up(within: .seconds(30)) + self.system.log.notice("Joined!") + + let chopper = await VegetableChopper(actorSystem: system) + self.chopper = chopper + self.system.log.notice("Vegetable chopper \(chopper) started!") + + for await chopper in await self.system.receptionist.listing(of: VegetableChopper.self) { + self.system.log.warning("GOT: \(chopper.id)") + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift b/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift new file mode 100644 index 000000000..1a6844cdd --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +struct LeaderNode { + let system: ClusterSystem + + init(name: String, port: Int) async { + self.system = await ClusterSystem(name) { settings in + settings.bindPort = port + + // We are purposefully making very slow calls, so they show up nicely in tracing: + settings.remoteCall.defaultTimeout = .seconds(20) + } + } + + func run() async throws { + monitorMembership(on: self.system) + + let cook = await PrimaryCook(actorSystem: system) + let meal = try await cook.makeDinner() + + self.system.log.notice("Made dinner successfully!") + } +} diff --git a/Samples/Sources/SampleClusterTracing/boot.swift b/Samples/Sources/SampleClusterTracing/boot.swift new file mode 100644 index 000000000..f506cb9c3 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/boot.swift @@ -0,0 +1,101 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +/* + * Sample showcasing a long traced interaction. + */ + +typealias DefaultDistributedActorSystem = ClusterSystem + +@main enum Main { + static func main() async throws { + print("===-----------------------------------------------------===") + print("| Cluster Tracing Sample App |") + print("| |") + print("| USAGE: |") + print("| swift run SampleClusterTracing # leader |") + print("| swift run SampleClusterTracing 7331 chopping |") + print("| swift run SampleClusterTracing 7332 chopping |") + print("===-----------------------------------------------------===") + + let port = Int(CommandLine.arguments.dropFirst().first ?? "7330")! + let role: ClusterNodeRole + if port == 7330 { + role = .leader + } else if CommandLine.arguments.dropFirst(2).first == "chopping" { + role = .chopping + } else { + fatalError("Undefined role for node: \(port)! Available roles: \(ClusterNodeRole.allCases)") + } + let nodeName = "SampleNode-\(port)-\(role)" + + // Bootstrap logging: + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + + // Bootstrap OpenTelemetry tracing: + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let exporter = OtlpGRPCSpanExporter(config: OtlpGRPCSpanExporter.Config(eventLoopGroup: group)) + let processor = OTel.SimpleSpanProcessor(exportingTo: exporter) + + let otel = OTel(serviceName: nodeName, eventLoopGroup: group, processor: processor) + InstrumentationSystem.bootstrap(otel.tracer()) + + // Start the sample app node. + // (All nodes attempt to join the leader at 7330, forming a cluster with it). + let system: ClusterSystem + + if role == .leader { + let node = await LeaderNode(name: nodeName, port: port) + system = node.system + try! await node.run() + } else { + var node = await ChoppingNode(name: nodeName, port: port) + system = node.system + try! await node.run() + } + + try await system.terminated + } +} + +func monitorMembership(on system: ClusterSystem) { + Task { + for await event in system.cluster.events { + system.log.debug("Membership change: \(event)") + + let membership = await system.cluster.membershipSnapshot + if membership.members(withStatus: .down).count == membership.count { + system.log.notice("Membership: \(membership.count)", metadata: [ + "cluster/membership": Logger.MetadataValue.array(membership.members(atMost: .down).map { + "\($0)" + }), + ]) + } + } + } +} + +enum ClusterNodeRole: CaseIterable, Hashable { + case leader + case chopping +} diff --git a/Samples/Sources/SampleDiningPhilosophers/boot.swift b/Samples/Sources/SampleDiningPhilosophers/boot.swift index a7c91d2de..79b23a833 100644 --- a/Samples/Sources/SampleDiningPhilosophers/boot.swift +++ b/Samples/Sources/SampleDiningPhilosophers/boot.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import _PrettyLogHandler import Distributed import DistributedCluster import Logging diff --git a/Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift b/Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift similarity index 97% rename from Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift rename to Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift index fc9ce8142..26004e235 100644 --- a/Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift +++ b/Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift @@ -29,7 +29,7 @@ import WASILibc #endif /// Logger that prints "pretty" for showcasing the cluster nicely in sample applications. -struct SamplePrettyLogHandler: LogHandler { +public struct SamplePrettyLogHandler: LogHandler { static let CONSOLE_RESET = "\u{001B}[0;0m" static let CONSOLE_BOLD = "\u{001B}[1m" @@ -52,8 +52,7 @@ struct SamplePrettyLogHandler: LogHandler { } } - // internal for testing only - internal init(label: String) { + public init(label: String) { self.label = label } diff --git a/Samples/docker/collector-config.yaml b/Samples/docker/collector-config.yaml new file mode 100644 index 000000000..1a9f4b8a6 --- /dev/null +++ b/Samples/docker/collector-config.yaml @@ -0,0 +1,24 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: otel-collector:4317 + +exporters: + logging: + logLevel: debug + + jaeger: + endpoint: "jaeger:14250" + tls: + insecure: true + + zipkin: + endpoint: "http://zipkin:9411/api/v2/spans" + + +service: + pipelines: + traces: + receivers: otlp + exporters: [logging, jaeger, zipkin] diff --git a/Samples/docker/docker-compose.yaml b/Samples/docker/docker-compose.yaml new file mode 100644 index 000000000..b4522b41a --- /dev/null +++ b/Samples/docker/docker-compose.yaml @@ -0,0 +1,26 @@ +version: '3' +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + command: ["--config=/etc/config.yaml"] + volumes: + - ./collector-config.yaml:/etc/config.yaml + ports: + - "4317:4317" + networks: [exporter] + depends_on: [zipkin, jaeger] + + zipkin: + image: openzipkin/zipkin:latest + ports: + - "9411:9411" + networks: [exporter] + + jaeger: + image: jaegertracing/all-in-one + ports: + - "16686:16686" + networks: [exporter] + +networks: + exporter: diff --git a/Sources/DistributedCluster/Cluster/Cluster+Membership.swift b/Sources/DistributedCluster/Cluster/Cluster+Membership.swift index 1b621f186..0e8a36ea1 100644 --- a/Sources/DistributedCluster/Cluster/Cluster+Membership.swift +++ b/Sources/DistributedCluster/Cluster/Cluster+Membership.swift @@ -703,6 +703,7 @@ extension Cluster { case notFoundAny(Cluster.Endpoint, in: Cluster.Membership) case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member) case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member) + case countRequirementNotMet(expected: Int, expectedStatus: Cluster.MemberStatus) case awaitStatusTimedOut(Duration, Error?) var prettyDescription: String { @@ -721,6 +722,8 @@ extension Cluster { return "Expected \(reflecting: foundMember.node) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]" case .statusRequirementNotMet(let expectedStatus, let foundMember): return "Expected \(reflecting: foundMember.node) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]" + case .countRequirementNotMet(let count, let expectedStatus): + return "Expected \(count) nodes to be seen as [\(expectedStatus)], but did not find enough" case .awaitStatusTimedOut(let duration, let lastError): let lastErrorMessage: String if let error = lastError { diff --git a/Sources/DistributedCluster/Cluster/ClusterControl.swift b/Sources/DistributedCluster/Cluster/ClusterControl.swift index 81da6cb71..e4302e471 100644 --- a/Sources/DistributedCluster/Cluster/ClusterControl.swift +++ b/Sources/DistributedCluster/Cluster/ClusterControl.swift @@ -162,10 +162,23 @@ public struct ClusterControl { /// /// - Returns `Cluster.Member` for the joined node. @discardableResult + @available(*, deprecated, renamed: "up(within:)") public func joined(within: Duration) async throws -> Cluster.Member { try await self.waitFor(self.node, .up, within: within) } + /// Wait, within the given duration, until this actor system has joined the cluster and become ``Cluster/MemberStatus/up``. + /// + /// - Parameters + /// - node: The node to be joined by this system. + /// - within: Duration to wait for. + /// + /// - Returns `Cluster.Member` for the joined node. + @discardableResult + public func up(within: Duration) async throws -> Cluster.Member { + try await self.waitFor(self.node, .up, within: within) + } + /// Wait, within the given duration, until the passed in node has joined the cluster and become ``Cluster/MemberStatus/up``. /// /// - Parameters @@ -208,6 +221,22 @@ public struct ClusterControl { } } + /// Wait, within the given duration, for the cluster to have at least `nodes` members of the specified status. + /// + /// - Parameters + /// - nodes: The _least_ (inclusive) number of nodes (including this node) to be part of the cluster membership + /// - status: The expected member status. + /// - within: Duration to wait for. + public func waitFor(countAtLeast: Int, _ status: Cluster.MemberStatus, within: Duration) async throws { + try await self.waitForMembershipEventually(within: within) { membership in + if membership.count(withStatus: status) >= countAtLeast { + return membership + } else { + throw Cluster.MembershipError(.countRequirementNotMet(expected: countAtLeast, expectedStatus: status)) + } + } + } + /// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status. /// /// - Parameters diff --git a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift index 9d3b2dc2d..ce223eeaf 100644 --- a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -525,9 +525,9 @@ extension OpLogDistributedReceptionist { for registration in registrations { if subscription.tryOffer(registration: registration) { - self.log.notice("OFFERED \(registration.actorID) TO \(subscription)") + self.log.trace("Offered \(registration.actorID) to \(subscription)") } else { - self.log.notice("DROPPED \(registration.actorID) TO \(subscription)") + self.log.trace("Dropped \(registration.actorID) to \(subscription)") } } } diff --git a/Sources/DistributedCluster/ClusterSystem.swift b/Sources/DistributedCluster/ClusterSystem.swift index a2ff3bb6d..eb057233e 100644 --- a/Sources/DistributedCluster/ClusterSystem.swift +++ b/Sources/DistributedCluster/ClusterSystem.swift @@ -21,6 +21,7 @@ import DistributedActorsConcurrencyHelpers import Foundation // for UUID import Logging import NIO +import Tracing /// A `ClusterSystem` is a confined space which runs and manages Actors. /// @@ -1148,28 +1149,36 @@ extension ClusterSystem { let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let reply: RemoteCallReply = try await self.withCallID(on: actor.id, target: target) { callID in - let invocation = InvocationMessage( - callID: callID, - targetIdentifier: target.identifier, - genericSubstitutions: invocation.genericSubstitutions, - arguments: arguments - ) + // -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body. + let baggage = Baggage.current ?? .topLevel + // TODO: we can enrich this with actor and system information here if not already present. + + return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in + let reply: RemoteCallReply = try await self.withCallID(on: actor.id, target: target) { callID in + var invocation = InvocationMessage( + callID: callID, + targetIdentifier: target.identifier, + genericSubstitutions: invocation.genericSubstitutions, + arguments: arguments + ) - recipient.sendInvocation(invocation) - } + InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage) - if let error = reply.thrownError { - throw error - } - guard let value = reply.value else { - throw RemoteCallError( - .invalidReply(reply.callID), - on: actor.id, - target: target - ) + recipient.sendInvocation(invocation) + } + + if let error = reply.thrownError { + throw error + } + guard let value = reply.value else { + throw RemoteCallError( + .invalidReply(reply.callID), + on: actor.id, + target: target + ) + } + return value } - return value } public func remoteCallVoid( @@ -1211,18 +1220,27 @@ extension ClusterSystem { let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in - let invocation = InvocationMessage( - callID: callID, - targetIdentifier: target.identifier, - genericSubstitutions: invocation.genericSubstitutions, - arguments: arguments - ) - recipient.sendInvocation(invocation) - } + // -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body. + let baggage = Baggage.current ?? .topLevel + // TODO: we can enrich this with actor and system information here if not already present. + + return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in + let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in + var invocation = InvocationMessage( + callID: callID, + targetIdentifier: target.identifier, + genericSubstitutions: invocation.genericSubstitutions, + arguments: arguments + ) - if let error = reply.thrownError { - throw error + InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage) + + recipient.sendInvocation(invocation) + } + + if let error = reply.thrownError { + throw error + } } } @@ -1233,8 +1251,10 @@ extension ClusterSystem { ) async throws -> Reply where Reply: AnyRemoteCallReply { + // Make an UUID for the remote call (so we can accept a reply for it) let callID = UUID() + // Prepare timeout handling let timeout = RemoteCall.timeout ?? self.settings.remoteCall.defaultTimeout let timeoutTask: Task = Task.detached { try await Task.sleep(nanoseconds: UInt64(timeout.nanoseconds)) @@ -1273,6 +1293,7 @@ extension ClusterSystem { timeoutTask.cancel() } + /// Call the body which should perform the actual call! let reply: any AnyRemoteCallReply = try await withCheckedThrowingContinuation { continuation in self.inFlightCallLock.withLock { self._inFlightCalls[callID] = continuation // this is to be resumed from an incoming reply or timeout @@ -1403,6 +1424,9 @@ extension ClusterSystem { return } + var baggage: Baggage = .topLevel + InstrumentationSystem.instrument.extract(invocation, into: &baggage, using: .invocationMessage) + Task { var decoder = ClusterInvocationDecoder(system: self, message: invocation) @@ -1420,12 +1444,14 @@ extension ClusterSystem { throw DeadLetterError(recipient: recipient) } - try await executeDistributedTarget( - on: actor, - target: target, - invocationDecoder: &decoder, - handler: resultHandler - ) + try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .server) { _ in + try await executeDistributedTarget( + on: actor, + target: target, + invocationDecoder: &decoder, + handler: resultHandler + ) + } } catch { // FIXME(distributed): is this right? do { diff --git a/Sources/DistributedCluster/DeadLetters.swift b/Sources/DistributedCluster/DeadLetters.swift index 282952ae6..66517f048 100644 --- a/Sources/DistributedCluster/DeadLetters.swift +++ b/Sources/DistributedCluster/DeadLetters.swift @@ -204,7 +204,7 @@ public final class DeadLetterOffice { } // in all other cases, we want to log the dead letter: - self.log.notice( + self.log.debug( "Dead letter was not delivered \(recipientString)", metadata: { () -> Logger.Metadata in // TODO: more metadata (from Envelope) (e.g. sender) diff --git a/Sources/DistributedCluster/InvocationBehavior.swift b/Sources/DistributedCluster/InvocationBehavior.swift index f641f65d2..328967b5c 100644 --- a/Sources/DistributedCluster/InvocationBehavior.swift +++ b/Sources/DistributedCluster/InvocationBehavior.swift @@ -14,6 +14,7 @@ import Distributed import struct Foundation.Data +import InstrumentationBaggage /// Representation of the distributed invocation in the Behavior APIs. /// This needs to be removed eventually as we remove behaviors. @@ -23,12 +24,19 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { let genericSubstitutions: [String] let arguments: [Data] + /// Tracing metadata, injected/extracted by distributed-tracing. + var metadata: [String: String] = [:] + + var hasMetadata: Bool { + !self.metadata.isEmpty + } + var target: RemoteCallTarget { RemoteCallTarget(targetIdentifier) } public var description: String { - "InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count))" + "InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count), metadata: \(metadata))" } } diff --git a/Sources/DistributedCluster/TracingSupport.swift b/Sources/DistributedCluster/TracingSupport.swift new file mode 100644 index 000000000..1259b81bc --- /dev/null +++ b/Sources/DistributedCluster/TracingSupport.swift @@ -0,0 +1,49 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Tracing + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Injector + +struct InvocationMessageInjector: Tracing.Injector { + typealias Carrier = InvocationMessage + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + carrier.metadata[key] = value + } +} + +extension Tracing.Injector where Self == InvocationMessageInjector { + static var invocationMessage: InvocationMessageInjector { + InvocationMessageInjector() + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Extractor + +struct InvocationMessageExtractor: Tracing.Extractor { + typealias Carrier = InvocationMessage + + func extract(key: String, from carrier: Carrier) -> String? { + carrier.metadata[key] + } +} + +extension Tracing.Extractor where Self == InvocationMessageExtractor { + static var invocationMessage: InvocationMessageExtractor { + InvocationMessageExtractor() + } +}