Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

+tracing Instrument Invocation handlers using swift-distributed-tracing #1085

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ var targets: [PackageDescription.Target] = [
.product(name: "NIOSSL", package: "swift-nio-ssl"),
.product(name: "NIOExtras", package: "swift-nio-extras"),
.product(name: "SwiftProtobuf", package: "swift-protobuf"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "ServiceDiscovery", package: "swift-service-discovery"),
.product(name: "Backtrace", package: "swift-backtrace"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
// Observability
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "Tracing", package: "swift-distributed-tracing"),
]
),

Expand Down Expand Up @@ -182,7 +184,8 @@ var dependencies: [Package.Dependency] = [
.package(url: "https://github.com/apple/swift-collections", from: "1.0.1"),

// ~~~ Observability ~~~
.package(url: "https://github.com/apple/swift-log", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-log", from: "1.4.0"),
.package(url: "https://github.com/apple/swift-distributed-tracing", from: "0.3.0"),
// swift-metrics 1.x and 2.x are almost API compatible, so most clients should use
.package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"),
.package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"),
Expand Down
25 changes: 24 additions & 1 deletion Samples/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var targets: [PackageDescription.Target] = [
name: "SampleDiningPhilosophers",
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
"_PrettyLogHandler",
],
path: "Sources/SampleDiningPhilosophers",
exclude: [
Expand All @@ -29,6 +30,24 @@ var targets: [PackageDescription.Target] = [
]
),

.executableTarget(
name: "SampleClusterTracing",
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
.product(name: "OpenTelemetry", package: "opentelemetry-swift"),
.product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"),
"_PrettyLogHandler",
],
path: "Sources/SampleClusterTracing"
),

.target(
name: "_PrettyLogHandler",
dependencies: [
.product(name: "DistributedCluster", package: "swift-distributed-actors"),
]
),

/* --- tests --- */

// no-tests placeholder project to not have `swift test` fail on Samples/
Expand All @@ -45,6 +64,7 @@ var dependencies: [Package.Dependency] = [
.package(name: "swift-distributed-actors", path: "../"),

// ~~~~~~~ only for samples ~~~~~~~
.package(url: "https://github.com/slashmo/opentelemetry-swift", from: "0.3.0"),
]

let package = Package(
Expand All @@ -58,11 +78,14 @@ let package = Package(
],
products: [
/* --- samples --- */

.executable(
name: "SampleDiningPhilosophers",
targets: ["SampleDiningPhilosophers"]
),
.executable(
name: "SampleClusterTracing",
targets: ["SampleClusterTracing"]
),
],

dependencies: dependencies,
Expand Down
28 changes: 28 additions & 0 deletions Samples/Sources/SampleClusterTracing/Clock+Extensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import _PrettyLogHandler
import Distributed
import DistributedCluster
import Logging
import NIO
import OpenTelemetry
import OtlpGRPCSpanExporting
import Tracing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all these imports needed?


// Sleep, with adding a little bit of noise (additional delay) to the duration.
func noisySleep(for duration: ContinuousClock.Duration) async {
var duration = duration + .milliseconds(Int.random(in: 100 ..< 300))
try? await Task.sleep(until: ContinuousClock.now + duration, clock: .continuous)
}
44 changes: 44 additions & 0 deletions Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import _PrettyLogHandler
import Distributed
import DistributedCluster
import Logging
import NIO
import Tracing

protocol Chopping {
func chop(_ vegetable: Vegetable) async throws -> Vegetable
}

distributed actor VegetableChopper: Chopping {
@ActorID.Metadata(\.receptionID)
var receptionID: String

init(actorSystem: ActorSystem) async {
self.actorSystem = actorSystem

self.receptionID = "*" // default key for "all of this type"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only type that gets registered with the receptionist so it doesn't matter what receptionID is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* is by convention the "default key" which we use for "all the actors" #1084

await actorSystem.receptionist.checkIn(self)
}

distributed func chop(_ vegetable: Vegetable) async throws -> Vegetable {
await InstrumentationSystem.tracer.withSpan(#function) { _ in
await noisySleep(for: .seconds(5))

return vegetable.asChopped
}
}
}
143 changes: 143 additions & 0 deletions Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import _PrettyLogHandler
import Distributed
import DistributedCluster
import Logging
import NIO
import Tracing

distributed actor PrimaryCook: LifecycleWatch {
lazy var log = Logger(actor: self)

var choppers: [ClusterSystem.ActorID: VegetableChopper] = [:]
var waitingForChoppers: (Int, CheckedContinuation<Void, Never>)?

init(actorSystem: ActorSystem) async {
self.actorSystem = actorSystem

_ = self.startChopperListingTask()
}

func startChopperListingTask() -> Task<Void, Never> {
Task {
for await chopper in await actorSystem.receptionist.listing(of: VegetableChopper.self) {
log.notice("Discovered vegetable chopper: \(chopper.id)")
self.choppers[chopper.id] = chopper

/// We implement a simple "if we're waiting for N choppers... let's notify the continuation once that is reached"
/// This would be nice to provide as a fun "active" collection type that can be `.waitFor(...)`-ed.
if let waitingForChoppersCount = self.waitingForChoppers?.0,
choppers.count >= waitingForChoppersCount
{
self.waitingForChoppers?.1.resume()
}
}
}
}

distributed func makeDinner() async throws -> Meal {
try await InstrumentationSystem.tracer.withSpan(#function) { _ in
await noisySleep(for: .milliseconds(200))

log.notice("Cooking dinner, but we need [2] vegetable choppers...! Suspend waiting for nodes to join.")
let (first, second) = try await getChoppers()
async let veggies = try chopVegetables(firstChopper: first, secondChopper: second)
async let meat = marinateMeat()
async let oven = preheatOven(temperature: 350)
// ...
return try await cook(veggies, meat, oven)
}
}

private func getChoppers() async throws -> (some Chopping, some Chopping) {
await withCheckedContinuation { cc in
self.waitingForChoppers = (2, cc)
}

var chopperIDs = self.choppers.keys.makeIterator()
guard let id1 = chopperIDs.next(),
let first = choppers[id1]
else {
throw NotEnoughChoppersError()
}
guard let id2 = chopperIDs.next(),
let second = choppers[id2]
else {
throw NotEnoughChoppersError()
}

return (first, second)
}

// Called by lifecycle watch when a watched actor terminates.
func terminated(actor id: DistributedCluster.ActorID) async {
self.choppers.removeValue(forKey: id)
}
}

func chopVegetables(firstChopper: some Chopping,
secondChopper: some Chopping) async throws -> [Vegetable]
{
try await InstrumentationSystem.tracer.withSpan("chopVegetables") { _ in
// Chop the vegetables...!
//
// However, since chopping is a very difficult operation,
// one chopping task can be performed at the same time on a single service!
// (Imagine that... we cannot parallelize these two tasks, and need to involve another service).
async let carrot = try firstChopper.chop(.carrot(chopped: false))
async let potato = try secondChopper.chop(.potato(chopped: false))
return try await [carrot, potato]
}
}

// func chop(_ vegetable: Vegetable, tracer: any Tracer) async throws -> Vegetable {
// await tracer.withSpan("chop-\(vegetable)") { _ in
// await sleep(for: .seconds(5))
// // ...
// return vegetable // "chopped"
// }
// }

func marinateMeat() async -> Meat {
await noisySleep(for: .milliseconds(620))

return await InstrumentationSystem.tracer.withSpan("marinateMeat") { _ in
await noisySleep(for: .seconds(3))
// ...
return Meat()
}
}

func preheatOven(temperature: Int) async -> Oven {
await InstrumentationSystem.tracer.withSpan("preheatOven") { _ in
// ...
await noisySleep(for: .seconds(6))
return Oven()
}
}

func cook(_: Any, _: Any, _: Any) async -> Meal {
await InstrumentationSystem.tracer.withSpan("cook") { span in
span.addEvent("children-asking-if-done-already")
await noisySleep(for: .seconds(3))
span.addEvent("children-asking-if-done-already-again")
await noisySleep(for: .seconds(2))
// ...
return Meal()
}
}

struct NotEnoughChoppersError: Error {}
40 changes: 40 additions & 0 deletions Samples/Sources/SampleClusterTracing/Model.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import _PrettyLogHandler
import Distributed
import DistributedCluster
import Logging
import NIO
import OpenTelemetry
import OtlpGRPCSpanExporting
import Tracing

struct Meal: Sendable, Codable {}

struct Meat: Sendable, Codable {}

struct Oven: Sendable, Codable {}

enum Vegetable: Sendable, Codable {
case potato(chopped: Bool)
case carrot(chopped: Bool)

var asChopped: Self {
switch self {
case .carrot: return .carrot(chopped: true)
case .potato: return .potato(chopped: true)
}
}
}
54 changes: 54 additions & 0 deletions Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Distributed Actors open source project
//
// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import _PrettyLogHandler
import Distributed
import DistributedCluster
import Logging
import NIO
import Tracing

struct ChoppingNode {
let system: ClusterSystem

var chopper: VegetableChopper?

init(name: String, port: Int) async {
self.system = await ClusterSystem(name) { settings in
settings.bindPort = port

// We are purposefully making very slow calls, so they show up nicely in tracing:
settings.remoteCall.defaultTimeout = .seconds(20)
}
}

mutating func run() async throws {
monitorMembership(on: self.system)

let leaderEndpoint = Cluster.Endpoint(host: self.system.cluster.endpoint.host, port: 7330)
self.system.log.notice("Joining: \(leaderEndpoint)")
self.system.cluster.join(endpoint: leaderEndpoint)

try await self.system.cluster.up(within: .seconds(30))
self.system.log.notice("Joined!")

let chopper = await VegetableChopper(actorSystem: system)
self.chopper = chopper
self.system.log.notice("Vegetable chopper \(chopper) started!")

for await chopper in await self.system.receptionist.listing(of: VegetableChopper.self) {
self.system.log.warning("GOT: \(chopper.id)")
}
}
}
Loading